From 7fd23c54a39de8e112ba1f690282f4dd842f329a Mon Sep 17 00:00:00 2001 From: Andrew Altshuler Date: Fri, 19 Jun 2026 03:34:15 +0300 Subject: [PATCH] fix(cluster): stop cluster-apply crash-loops from the recovery-sidecar trap (#284) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(cluster): stop cluster-apply crash-loops from the recovery-sidecar trap A `cluster apply` carrying a schema change against a graph that has non-main branches, or an unsupported "needs backfill" migration, armed a recovery sidecar *before* calling the engine, then left it behind when the engine rejected the apply pre-movement. The server refuses to boot while any sidecar is pending, and re-running apply re-armed a fresh sidecar — an unescapable crash loop. None of the engine rejections are bugs; the trap is in the apply/serve choreography. Three coordinated changes: 1. Preview before arming the sidecar. `cluster apply` now runs `preview_schema_apply_with_options` before `write_recovery_sidecar`, so parser/planner rejections (non-main branches, unsupported plan) fail loudly without leaving recovery work behind. The post-preview engine error path now deletes the sidecar when the live schema still matches the recorded digest (nothing moved), and keeps it only on real mid-movement failure — both branches covered by new engine-failpoint tests (cluster failpoints now enable omnigraph/failpoints). 2. Per-graph quarantine at serve time instead of whole-cluster refusal. A graph-attributed pending sidecar, an unopenable graph root, a query parse failure, or an unresolvable embedding provider now quarantines just that graph (logged loudly at every boot layer) while healthy graphs serve; `/graphs` lists only ready graphs and quarantined routes 404. Cluster-global problems (missing/unreadable state, malformed or unattributable sidecars, shared-catalog or cluster-policy errors, zero healthy graphs) stay fail-fast. `--require-all-graphs` / OMNIGRAPH_REQUIRE_ALL_GRAPHS=1 restores all-or-nothing boot. 3. Backfill embedding-provider profile metadata on apply. Mirrors the existing policy-binding backfill: a pre-5A ledger missing `embedding_profile` is now detected as a metadata-only change and backfilled by a no-op apply, instead of bricking serve with `embedding_provider_profile_missing` forever. Tests: trap (no sidecar after a rejected apply), both digest-cleanup branches, per-graph quarantine (cluster + server), embedding backfill. Co-Authored-By: Claude Opus 4.8 (1M context) * docs: resilient cluster boot + recovery-sidecar trap fix Amend RFC-005 D4 readiness posture (cluster-global fail-fast vs graph-local quarantine; deviation #5 for --require-all-graphs), add the v0.7.0 release note, and update the user cluster/server/deployment docs and the OMNIGRAPH_REQUIRE_ALL_GRAPHS env var. Co-Authored-By: Claude Opus 4.8 (1M context) * fix(cluster): surface sidecar-cleanup failures; document severity promotion Address Greptile review on PR #284: - The pre-movement sidecar cleanup fast-path discarded `delete_object`'s result, so a transient delete failure left the graph quarantined with no signal. Add `try_delete_object` (Result-returning) and emit a `recovery_sidecar_cleanup_failed` warning diagnostic on failure; the fire-and-forget `delete_object` now delegates to it. - Document why the serve-time loop promotes every `list_recovery_sidecars` diagnostic to a cluster-fatal error (the listing only emits genuine read/parse/version failures, as warnings, whose blast radius serving cannot prove) and note the promote-by-code path if that ever changes. Co-Authored-By: Claude Opus 4.8 (1M context) --------- Co-authored-by: Claude Opus 4.8 (1M context) --- crates/omnigraph-cluster/Cargo.toml | 4 +- crates/omnigraph-cluster/src/diff.rs | 44 ++++ crates/omnigraph-cluster/src/lib.rs | 150 ++++++++++---- crates/omnigraph-cluster/src/serve.rs | 98 +++++++-- crates/omnigraph-cluster/src/store.rs | 9 +- crates/omnigraph-cluster/src/tests.rs | 206 ++++++++++++++++++- crates/omnigraph-cluster/src/types.rs | 11 + crates/omnigraph-cluster/tests/failpoints.rs | 127 ++++++++++-- crates/omnigraph-server/src/lib.rs | 126 ++++++++---- crates/omnigraph-server/src/main.rs | 14 +- crates/omnigraph-server/src/settings.rs | 196 ++++++++++++++---- crates/omnigraph-server/tests/multi_graph.rs | 132 +++++++++++- crates/omnigraph-server/tests/s3.rs | 7 +- crates/omnigraph-server/tests/support/mod.rs | 11 +- docker/entrypoint.sh | 9 +- docs/dev/rfc-005-server-cluster-boot.md | 21 +- docs/releases/v0.7.0.md | 6 + docs/user/clusters/config.md | 43 ++-- docs/user/clusters/index.md | 10 +- docs/user/deployment.md | 1 + docs/user/operations/server.md | 21 +- 21 files changed, 1043 insertions(+), 203 deletions(-) diff --git a/crates/omnigraph-cluster/Cargo.toml b/crates/omnigraph-cluster/Cargo.toml index 05a9308..f0a3a22 100644 --- a/crates/omnigraph-cluster/Cargo.toml +++ b/crates/omnigraph-cluster/Cargo.toml @@ -10,8 +10,8 @@ documentation = "https://docs.rs/omnigraph-cluster" [features] # Fault-injection hooks for the apply protocol (crash-mid-apply, CAS-race -# tests). Deliberately does NOT enable omnigraph/failpoints. -failpoints = ["dep:fail", "fail/failpoints"] +# tests), including cluster/engine boundary failures. +failpoints = ["dep:fail", "fail/failpoints", "omnigraph/failpoints"] [dependencies] omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.0" } diff --git a/crates/omnigraph-cluster/src/diff.rs b/crates/omnigraph-cluster/src/diff.rs index 516a86e..ce29a45 100644 --- a/crates/omnigraph-cluster/src/diff.rs +++ b/crates/omnigraph-cluster/src/diff.rs @@ -18,6 +18,7 @@ pub(crate) fn diff_resources( disposition: None, reason: None, binding_change: false, + metadata_change: None, migration: None, }), Some(before) if before != after => changes.push(PlanChange { @@ -28,6 +29,7 @@ pub(crate) fn diff_resources( disposition: None, reason: None, binding_change: false, + metadata_change: None, migration: None, }), Some(_) => {} @@ -43,6 +45,7 @@ pub(crate) fn diff_resources( disposition: None, reason: None, binding_change: false, + metadata_change: None, migration: None, }); } @@ -82,6 +85,47 @@ pub(crate) fn append_policy_binding_changes( disposition: None, reason: None, binding_change: true, + metadata_change: Some(PlanMetadataChange::PolicyBindings), + migration: None, + }); + } + changes.sort_by(|a, b| a.resource.cmp(&b.resource)); +} + +/// Metadata-only embedding provider changes: the provider digest is unchanged +/// but the applied state predates storing the profile body needed by +/// config-free serving. This mirrors policy binding backfill instead of +/// hiding a serving-time failure behind a no-op plan. +pub(crate) fn append_embedding_profile_changes( + changes: &mut Vec, + prior_state: Option<&ClusterState>, + desired: &DesiredCluster, +) { + let Some(state) = prior_state else { + return; // no state: provider Creates carry profiles already + }; + for (address, desired_profile) in &desired.embedding_providers { + if changes + .iter() + .any(|change| change.resource.as_str() == address.as_str()) + { + continue; // content change already covers it + } + let Some(entry) = state.applied_revision.resources.get(address) else { + continue; // not applied yet: the Create covers it + }; + if entry.embedding_profile.as_ref() == Some(desired_profile) { + continue; + } + changes.push(PlanChange { + resource: address.clone(), + operation: PlanOperation::Update, + before_digest: Some(entry.digest.clone()), + after_digest: Some(entry.digest.clone()), + disposition: None, + reason: None, + binding_change: false, + metadata_change: Some(PlanMetadataChange::EmbeddingProfile), migration: None, }); } diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 0dad78c..bed27c8 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -33,9 +33,9 @@ use config::{ validate_id, validate_query_source, }; use diff::{ - FailedGraphOrigin, ResourceKind, append_policy_binding_changes, approved_resources, - classify_changes, compute_approvals, compute_blast_radius, demote_dependents_of_failed_graphs, - diff_resources, resource_kind, + FailedGraphOrigin, ResourceKind, append_embedding_profile_changes, + append_policy_binding_changes, approved_resources, classify_changes, compute_approvals, + compute_blast_radius, demote_dependents_of_failed_graphs, diff_resources, resource_kind, }; pub use serve::{ ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, cluster_graph_ids, @@ -183,6 +183,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { }; if !has_errors(&diagnostics) { append_policy_binding_changes(&mut changes, prior_state.as_ref(), &desired); + append_embedding_profile_changes(&mut changes, prior_state.as_ref(), &desired); } // Plan previews dispositions without sweeping; a pending recovery is // surfaced as the cluster_recovery_pending warning above instead. @@ -404,6 +405,7 @@ pub async fn apply_config_dir_with_options( let prior_resources = state_resource_digests(&state); let mut changes = diff_resources(&prior_resources, &desired.resource_digests); append_policy_binding_changes(&mut changes, Some(&state), &desired); + append_embedding_profile_changes(&mut changes, Some(&state), &desired); let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics).await; let approved = approved_resources( &approval_artifacts, @@ -639,42 +641,9 @@ pub async fn apply_config_dir_with_options( continue; } }; - let observed_manifest_version = match db.snapshot_of(ReadTarget::branch("main")).await { - Ok(snapshot) => Some(snapshot.version()), - Err(_) => None, - }; - let mut sidecar = RecoverySidecar { - schema_version: 1, - operation_id: Ulid::new().to_string(), - started_at: now_rfc3339(), - actor: options.actor.clone(), - kind: RecoverySidecarKind::SchemaApply, - graph_id: graph_id.clone(), - graph_uri: graph_uri.clone(), - observed_manifest_version, - expected_manifest_version: None, - desired_schema_digest: desired_graph.schema_digest.clone(), - state_cas_base: expected_cas.clone(), - approval_id: None, - }; - let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await { - Ok(path) => path, - Err(diagnostic) => { - diagnostics.push(diagnostic); - failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); - graph_moving_aborted = true; - continue; - } - }; - if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_schema_apply") { - // Simulated crash before the engine call: the sidecar stays; the - // sweep retires it next run (ledger still consistent with live). - diagnostics.push(diagnostic); - failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); - graph_moving_aborted = true; - continue; - } - // Re-read + digest-verify the desired schema source under the lock. + // Re-read + digest-verify the desired schema source before the + // cluster sidecar exists. Parser/planner rejections cannot have + // moved graph state, so they must not leave recovery work behind. let schema_source = source_paths .get(schema_address(graph_id).as_str()) .ok_or_else(|| { @@ -708,12 +677,64 @@ pub async fn apply_config_dir_with_options( Ok(source) => source, Err(diagnostic) => { diagnostics.push(diagnostic); - backend.delete_object(&sidecar_path).await; // nothing moved failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); graph_moving_aborted = true; continue; } }; + if let Err(err) = db + .preview_schema_apply_with_options(&schema_source, SchemaApplyOptions::default()) + .await + { + diagnostics.push(Diagnostic::error( + "schema_apply_failed", + schema_address(graph_id), + format!("schema apply is not supported on '{graph_uri}': {err}"), + )); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); + graph_moving_aborted = true; + continue; + } + let observed_manifest_version = match db.snapshot_of(ReadTarget::branch("main")).await { + Ok(snapshot) => Some(snapshot.version()), + Err(_) => None, + }; + let recorded_schema_digest = state + .applied_revision + .resources + .get(&schema_address(graph_id)) + .map(|entry| entry.digest.clone()); + let mut sidecar = RecoverySidecar { + schema_version: 1, + operation_id: Ulid::new().to_string(), + started_at: now_rfc3339(), + actor: options.actor.clone(), + kind: RecoverySidecarKind::SchemaApply, + graph_id: graph_id.clone(), + graph_uri: graph_uri.clone(), + observed_manifest_version, + expected_manifest_version: None, + desired_schema_digest: desired_graph.schema_digest.clone(), + state_cas_base: expected_cas.clone(), + approval_id: None, + }; + let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await { + Ok(path) => path, + Err(diagnostic) => { + diagnostics.push(diagnostic); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); + graph_moving_aborted = true; + continue; + } + }; + if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_schema_apply") { + // Simulated crash before the engine call: the sidecar stays; the + // sweep retires it next run (ledger still consistent with live). + diagnostics.push(diagnostic); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); + graph_moving_aborted = true; + continue; + } // Soft drops only: allow_data_loss stays false until the approval // artifacts of stage 4C exist (RFC-004 §D4). match db @@ -736,8 +757,29 @@ pub async fn apply_config_dir_with_options( schema_address(graph_id), format!("schema apply failed on '{graph_uri}': {err}"), )); - // Sidecar stays; the sweep retires it (live digest unchanged - // == ledger consistent) or flags real movement. + if live_schema_matches_recorded_digest( + &graph_uri, + recorded_schema_digest.as_deref(), + observed_manifest_version, + ) + .await + { + // Pre-movement rejection: nothing moved, so retire the + // sidecar eagerly. A delete failure leaves it safe (the + // graph is quarantined until the next sweep), but surface + // it so an operator isn't left debugging a silent stick. + if let Err(err) = backend.try_delete_object(&sidecar_path).await { + diagnostics.push(Diagnostic::warning( + "recovery_sidecar_cleanup_failed", + sidecar_path.clone(), + format!( + "could not delete the stale recovery sidecar after a pre-movement \ + schema-apply rejection; graph `{graph_id}` stays quarantined until \ + a state-mutating cluster command sweeps it: {err}" + ), + )); + } + } failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); graph_moving_aborted = true; continue; @@ -1022,6 +1064,7 @@ pub async fn apply_config_dir_with_options( &desired.resource_digests, ); append_policy_binding_changes(&mut residual, Some(&new_state), &desired); + append_embedding_profile_changes(&mut residual, Some(&new_state), &desired); let converged = residual.is_empty(); if converged { new_state.applied_revision.config_digest = Some(desired.config_digest.clone()); @@ -1939,6 +1982,29 @@ fn embedding_provider_digest(profile: &EmbeddingProviderConfig) -> String { sha256_hex(input.as_bytes()) } +async fn live_schema_matches_recorded_digest( + graph_uri: &str, + recorded_schema_digest: Option<&str>, + observed_manifest_version: Option, +) -> bool { + let Some(recorded_schema_digest) = recorded_schema_digest else { + return false; + }; + let Some(observed_manifest_version) = observed_manifest_version else { + return false; + }; + let Ok(db) = Omnigraph::open_read_only(graph_uri).await else { + return false; + }; + let Ok(snapshot) = db.snapshot_of(ReadTarget::branch("main")).await else { + return false; + }; + if snapshot.version() != observed_manifest_version { + return false; + } + sha256_hex(db.schema_source().as_bytes()) == recorded_schema_digest +} + fn desired_config_digest( raw: &RawClusterConfig, resource_digests: &BTreeMap, diff --git a/crates/omnigraph-cluster/src/serve.rs b/crates/omnigraph-cluster/src/serve.rs index 6f89e2d..54d3017 100644 --- a/crates/omnigraph-cluster/src/serve.rs +++ b/crates/omnigraph-cluster/src/serve.rs @@ -37,11 +37,14 @@ pub struct ServingSnapshot { pub graphs: Vec, pub queries: Vec, pub policies: Vec, + pub diagnostics: Vec, } /// Read the applied revision as a serving snapshot — the read-only loader for -/// the Phase-5 server boot. All-or-nothing per RFC-005 §D4: every readiness -/// failure is collected and the whole snapshot refused; no partial serving. +/// the Phase-5 server boot. Cluster-global readiness failures are still +/// all-or-nothing, but graph-attributed pending recovery sidecars quarantine +/// only that graph so healthy graphs can continue serving. This loader never +/// runs a recovery sweep. /// Takes no lock: the state file is replaced atomically, so this reads a /// consistent point-in-time ledger. pub async fn read_serving_snapshot( @@ -190,19 +193,44 @@ async fn read_snapshot_with_store( backend: ClusterStore, ) -> Result> { let mut diagnostics: Vec = Vec::new(); + let mut startup_diagnostics: Vec = Vec::new(); + let mut quarantined_graphs: BTreeSet = BTreeSet::new(); - // A ledger a sweep is about to rewrite must not start serving. + // Do not sweep at serve time. Valid graph-attributed sidecars quarantine + // that graph; malformed/unattributable sidecars remain cluster-fatal + // because serving cannot prove their blast radius. + let sidecar_diag_start = diagnostics.len(); let sidecars = backend.list_recovery_sidecars(&mut diagnostics).await; - if !sidecars.is_empty() { - diagnostics.push(Diagnostic::error( + // Every diagnostic `list_recovery_sidecars` appends is a genuine + // read/parse/version failure (emitted as a warning by `store::list_json_dir`) + // whose blast radius serving cannot prove — promote each to a cluster-fatal + // error. This depends on that listing only ever emitting failure diagnostics; + // if it grows a benign/informational one, promote by code instead. + for diagnostic in diagnostics.iter_mut().skip(sidecar_diag_start) { + diagnostic.severity = DiagnosticSeverity::Error; + } + for (path, sidecar) in sidecars { + if sidecar.graph_id.trim().is_empty() { + diagnostics.push(Diagnostic::error( + "cluster_recovery_unattributed", + path, + "recovery sidecar has no graph id; run a state-mutating cluster command to sweep it before serving", + )); + continue; + } + quarantined_graphs.insert(sidecar.graph_id.clone()); + startup_diagnostics.push(Diagnostic::warning( "cluster_recovery_pending", - CLUSTER_RECOVERIES_DIR, + graph_address(&sidecar.graph_id), format!( - "{} interrupted operation(s) await recovery; run any state-mutating cluster command (e.g. `cluster apply`) to sweep, then retry", - sidecars.len() + "graph `{}` is quarantined because interrupted operation `{}` awaits recovery; run any state-mutating cluster command (e.g. `cluster apply`) to sweep", + sidecar.graph_id, sidecar.operation_id ), )); } + if has_errors(&diagnostics) { + return Err(diagnostics); + } let mut observations = backend.observations(); let state = match backend.read_state(&mut observations).await { @@ -223,14 +251,29 @@ async fn read_snapshot_with_store( } }; let Some(state) = state else { + diagnostics.extend(startup_diagnostics); return Err(diagnostics); }; + let required_embedding_providers: BTreeSet = state + .applied_revision + .resources + .iter() + .filter_map(|(address, entry)| match resource_kind(address) { + ResourceKind::Graph(graph_id) if !quarantined_graphs.contains(&graph_id) => { + entry.embedding_provider.clone() + } + _ => None, + }) + .collect(); let mut embedding_profiles: BTreeMap = BTreeMap::new(); for (address, entry) in &state.applied_revision.resources { if !matches!(resource_kind(address), ResourceKind::EmbeddingProvider(_)) { continue; } + if !required_embedding_providers.contains(address) { + continue; + } let Some(profile) = entry.embedding_profile.clone() else { diagnostics.push(Diagnostic::error( "embedding_provider_profile_missing", @@ -256,9 +299,14 @@ async fn read_snapshot_with_store( let mut graphs = Vec::new(); let mut queries = Vec::new(); let mut policies = Vec::new(); + let mut saw_applied_graph = false; for (address, entry) in &state.applied_revision.resources { match resource_kind(address) { ResourceKind::Graph(graph_id) => { + saw_applied_graph = true; + if quarantined_graphs.contains(&graph_id) { + continue; + } let embedding = match entry.embedding_provider.as_deref() { Some(provider_address) => match resource_kind(provider_address) { ResourceKind::EmbeddingProvider(_) => { @@ -300,6 +348,9 @@ async fn read_snapshot_with_store( let ResourceKind::Query { graph, name } = &kind else { unreachable!() }; + if quarantined_graphs.contains(graph) { + continue; + } match backend .read_verified_payload(&kind, &entry.digest, address) .await @@ -324,6 +375,17 @@ async fn read_snapshot_with_store( )); continue; }; + let applies_to: Vec = applies_to + .into_iter() + .filter(|binding| { + binding + .strip_prefix("graph.") + .is_none_or(|graph| !quarantined_graphs.contains(graph)) + }) + .collect(); + if applies_to.is_empty() { + continue; + } match backend .read_verified_payload(&kind, &entry.digest, address) .await @@ -342,19 +404,29 @@ async fn read_snapshot_with_store( } if graphs.is_empty() { - diagnostics.push(Diagnostic::error( - "cluster_empty", - CLUSTER_STATE_FILE, - "the applied revision records no graphs; apply a cluster with at least one graph before serving from it", - )); + if saw_applied_graph && !quarantined_graphs.is_empty() { + diagnostics.push(Diagnostic::error( + "cluster_no_healthy_graphs", + CLUSTER_RECOVERIES_DIR, + "all applied graphs are quarantined by pending recovery sidecars; run any state-mutating cluster command (e.g. `cluster apply`) to sweep, then retry", + )); + } else { + diagnostics.push(Diagnostic::error( + "cluster_empty", + CLUSTER_STATE_FILE, + "the applied revision records no graphs; apply a cluster with at least one graph before serving from it", + )); + } } if has_errors(&diagnostics) { + diagnostics.extend(startup_diagnostics); return Err(diagnostics); } Ok(ServingSnapshot { graphs, queries, policies, + diagnostics: startup_diagnostics, }) } diff --git a/crates/omnigraph-cluster/src/store.rs b/crates/omnigraph-cluster/src/store.rs index c19a95d..a156d78 100644 --- a/crates/omnigraph-cluster/src/store.rs +++ b/crates/omnigraph-cluster/src/store.rs @@ -250,7 +250,14 @@ impl ClusterStore { /// Best-effort object removal (sidecar retirement after a CAS lands, /// lock cleanup) — failures are recoverable by the next sweep. pub(crate) async fn delete_object(&self, uri: &str) { - let _ = self.adapter.delete(uri).await; + let _ = self.try_delete_object(uri).await; + } + + /// Like `delete_object` but surfaces the failure, so a caller that depends + /// on the deletion (e.g. the pre-movement sidecar cleanup fast-path) can + /// report it as a diagnostic instead of silently leaving stale state. + pub(crate) async fn try_delete_object(&self, uri: &str) -> Result<(), String> { + self.adapter.delete(uri).await.map_err(|err| err.to_string()) } /// Recursive prefix delete for graph roots (approved deletes). Idempotent; diff --git a/crates/omnigraph-cluster/src/tests.rs b/crates/omnigraph-cluster/src/tests.rs index b14b46e..7eae69f 100644 --- a/crates/omnigraph-cluster/src/tests.rs +++ b/crates/omnigraph-cluster/src/tests.rs @@ -1174,6 +1174,19 @@ graphs: .unwrap() } + fn recovery_sidecars(config_dir: &Path) -> Vec { + let dir = config_dir.join(CLUSTER_RECOVERIES_DIR); + if !dir.exists() { + return Vec::new(); + } + let mut sidecars: Vec<_> = fs::read_dir(dir) + .unwrap() + .map(|entry| entry.unwrap().path()) + .collect(); + sidecars.sort(); + sidecars + } + fn query_payload_path(config_dir: &Path, digest: &str) -> std::path::PathBuf { config_dir .join(CLUSTER_RESOURCES_DIR) @@ -1586,8 +1599,17 @@ graphs: state["applied_revision"]["resources"]["schema.knowledge"]["digest"], desired.resource_digests["schema.knowledge"] ); - // Second run: the sweep retires the stale sidecar (ledger consistent) - // and the run fails just as loudly — idempotent loudness. + let db = Omnigraph::open_read_only(&derived_graph_uri(dir.path(), "knowledge")) + .await + .unwrap(); + assert_eq!(db.schema_source().as_str(), SCHEMA); + assert!( + recovery_sidecars(dir.path()).is_empty(), + "{:?}", + recovery_sidecars(dir.path()) + ); + // Second run fails just as loudly and still leaves no sidecar because + // the engine preview rejects before graph state can move. let second = apply_config_dir(dir.path()).await; assert!(!second.ok); assert!( @@ -1596,6 +1618,45 @@ graphs: .iter() .any(|diagnostic| diagnostic.code == "schema_apply_failed") ); + assert!( + recovery_sidecars(dir.path()).is_empty(), + "{:?}", + recovery_sidecars(dir.path()) + ); + } + + #[tokio::test] + async fn apply_schema_update_blocked_by_non_main_branch_leaves_no_sidecar() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); + let graph_uri = derived_graph_uri(dir.path(), "knowledge"); + let db = Omnigraph::open(&graph_uri).await.unwrap(); + db.branch_create("feature").await.unwrap(); + drop(db); + let before_state = read_state_json(dir.path()); + fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap(); + + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!(out.diagnostics.iter().any(|diagnostic| { + diagnostic.code == "schema_apply_failed" + && diagnostic + .message + .contains("schema apply requires a graph with only main") + })); + assert!( + recovery_sidecars(dir.path()).is_empty(), + "{:?}", + recovery_sidecars(dir.path()) + ); + let after_state = read_state_json(dir.path()); + assert_eq!( + after_state["applied_revision"]["resources"], + before_state["applied_revision"]["resources"] + ); + let reopened = Omnigraph::open_read_only(&graph_uri).await.unwrap(); + assert_eq!(reopened.schema_source().as_str(), SCHEMA); } #[tokio::test] @@ -2964,6 +3025,10 @@ policies: .find(|change| change.resource == "policy.base") .expect("binding change must be visible in plan"); assert!(change.binding_change); + assert_eq!( + change.metadata_change, + Some(PlanMetadataChange::PolicyBindings) + ); assert_eq!(change.operation, PlanOperation::Update); assert_eq!(change.before_digest, change.after_digest); @@ -3002,9 +3067,9 @@ policies: let plan = plan_config_dir(dir.path()).await; assert!( - plan.changes - .iter() - .any(|change| change.resource == "policy.base" && change.binding_change), + plan.changes.iter().any(|change| change.resource == "policy.base" + && change.binding_change + && change.metadata_change == Some(PlanMetadataChange::PolicyBindings)), "{plan:?}" ); let out = apply_config_dir(dir.path()).await; @@ -3016,6 +3081,52 @@ policies: ); } + #[tokio::test] + async fn pre_5a_state_backfills_embedding_profile() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_mock_embedding_cluster(dir.path(), "recorded-x"); + write_applyable_state(dir.path()); + let converge = apply_config_dir(dir.path()).await; + assert!(converge.converged, "{converge:?}"); + + let mut state = read_state_json(dir.path()); + state["applied_revision"]["resources"]["provider.embedding.default"] + .as_object_mut() + .unwrap() + .remove("embedding_profile"); + fs::write( + dir.path().join(CLUSTER_STATE_FILE), + serde_json::to_string_pretty(&state).unwrap(), + ) + .unwrap(); + + let plan = plan_config_dir(dir.path()).await; + let change = plan + .changes + .iter() + .find(|change| change.resource == "provider.embedding.default") + .expect("embedding profile backfill must be visible in plan"); + assert_eq!(change.operation, PlanOperation::Update); + assert_eq!(change.before_digest, change.after_digest); + assert_eq!( + change.metadata_change, + Some(PlanMetadataChange::EmbeddingProfile) + ); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok && out.converged, "{out:?}"); + let healed = read_state_json(dir.path()); + assert_eq!( + healed["applied_revision"]["resources"]["provider.embedding.default"] + ["embedding_profile"]["model"], + serde_json::json!("recorded-x") + ); + let snapshot = read_serving_snapshot(dir.path()).await.unwrap(); + let profile = snapshot.graphs[0].embedding.as_ref().unwrap(); + assert_eq!(profile.model.as_deref(), Some("recorded-x")); + } + #[tokio::test] async fn bindings_survive_refresh() { let dir = fixture(); @@ -3189,9 +3300,92 @@ policies: let err = read_serving_snapshot(dir.path()).await.unwrap_err(); assert!( - err.iter().any(|diagnostic| diagnostic.code == "cluster_recovery_pending"), + err.iter() + .any(|diagnostic| diagnostic.code == "cluster_no_healthy_graphs"), "{err:?}" ); + assert!( + err.iter().any(|diagnostic| { + diagnostic.code == "cluster_recovery_pending" + && diagnostic.path == "graph.knowledge" + }), + "{err:?}" + ); + } + + #[tokio::test] + async fn serving_snapshot_quarantines_one_graph_with_pending_recovery() { + let dir = fixture(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + r#" +version: 1 +metadata: + name: test +state: + backend: cluster + lock: true +graphs: + knowledge: + schema: ./people.pg + archive: + schema: ./people.pg +"#, + ) + .unwrap(); + let graph_dir = dir.path().join(CLUSTER_GRAPHS_DIR); + fs::create_dir_all(&graph_dir).unwrap(); + Omnigraph::init( + graph_dir.join("knowledge.omni").to_string_lossy().as_ref(), + SCHEMA, + ) + .await + .unwrap(); + Omnigraph::init( + graph_dir.join("archive.omni").to_string_lossy().as_ref(), + SCHEMA, + ) + .await + .unwrap(); + let desired = validate_config_dir(dir.path()); + assert!(desired.ok, "{:?}", desired.diagnostics); + let schema_digest = desired.resource_digests["schema.knowledge"].clone(); + let empty_queries = BTreeMap::new(); + let knowledge_digest = graph_digest( + "knowledge", + Some(&schema_digest), + Some(&empty_queries), + None, + None, + ); + let archive_digest = graph_digest( + "archive", + Some(&schema_digest), + Some(&empty_queries), + None, + None, + ); + write_state_resources( + dir.path(), + &[ + ("graph.knowledge", knowledge_digest.as_str()), + ("schema.knowledge", schema_digest.as_str()), + ("graph.archive", archive_digest.as_str()), + ("schema.archive", schema_digest.as_str()), + ], + ); + write_schema_apply_sidecar(dir.path(), "knowledge", "whatever", "01SERVE2"); + + let snapshot = read_serving_snapshot(dir.path()).await.unwrap(); + assert_eq!(snapshot.graphs.len(), 1); + assert_eq!(snapshot.graphs[0].graph_id, "archive"); + assert!(snapshot.queries.is_empty()); + assert!(snapshot.policies.is_empty()); + assert!(snapshot.diagnostics.iter().any(|diagnostic| { + diagnostic.code == "cluster_recovery_pending" + && diagnostic.path == "graph.knowledge" + && diagnostic.severity == DiagnosticSeverity::Warning + })); } #[tokio::test] diff --git a/crates/omnigraph-cluster/src/types.rs b/crates/omnigraph-cluster/src/types.rs index 97ad406..7687575 100644 --- a/crates/omnigraph-cluster/src/types.rs +++ b/crates/omnigraph-cluster/src/types.rs @@ -176,6 +176,10 @@ pub struct PlanChange { /// pre-5A backfill case). #[serde(default, skip_serializing_if = "std::ops::Not::not")] pub binding_change: bool, + /// Metadata-only updates whose resource content digest is unchanged but + /// whose applied ledger metadata needs to converge. + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata_change: Option, /// For schema updates: the engine's migration plan against the live /// graph (RFC-004 §D7's data-aware preview). Absent when the preview is /// unavailable (warning `schema_preview_unavailable`). @@ -183,6 +187,13 @@ pub struct PlanChange { pub migration: Option, } +#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum PlanMetadataChange { + PolicyBindings, + EmbeddingProfile, +} + #[derive(Debug, Clone, Serialize, PartialEq, Eq)] pub struct BlastRadius { pub resource: String, diff --git a/crates/omnigraph-cluster/tests/failpoints.rs b/crates/omnigraph-cluster/tests/failpoints.rs index 5cdf2d4..51997ce 100644 --- a/crates/omnigraph-cluster/tests/failpoints.rs +++ b/crates/omnigraph-cluster/tests/failpoints.rs @@ -13,8 +13,9 @@ use std::fs; use std::path::{Path, PathBuf}; use fail::FailScenario; -use omnigraph_cluster::failpoints::ScopedFailPoint; use omnigraph::db::Omnigraph; +use omnigraph::failpoints::ScopedFailPoint as EngineScopedFailPoint; +use omnigraph_cluster::failpoints::ScopedFailPoint; use omnigraph_cluster::{ ApplyOptions, apply_config_dir, apply_config_dir_with_options, approve_config_dir, validate_config_dir, @@ -178,13 +179,12 @@ async fn apply_cas_race_surfaces_state_cas_mismatch() { // after apply read it but before apply writes. RAII-guarded so a panic // inside apply cannot leak the callback into the global registry. let race_path = state_path(dir.path()); - let failpoint = - ScopedFailPoint::with_callback("cluster_apply.before_state_write", move || { - let mut state: serde_json::Value = - serde_json::from_str(&fs::read_to_string(&race_path).unwrap()).unwrap(); - state["state_revision"] = serde_json::json!(99); - fs::write(&race_path, serde_json::to_string_pretty(&state).unwrap()).unwrap(); - }); + let failpoint = ScopedFailPoint::with_callback("cluster_apply.before_state_write", move || { + let mut state: serde_json::Value = + serde_json::from_str(&fs::read_to_string(&race_path).unwrap()).unwrap(); + state["state_revision"] = serde_json::json!(99); + fs::write(&race_path, serde_json::to_string_pretty(&state).unwrap()).unwrap(); + }); let out = apply_config_dir(dir.path()).await; drop(failpoint); @@ -336,10 +336,9 @@ async fn create_crash_after_init_rolls_state_forward() { ); assert!(recovered.converged); assert!(recovery_sidecars(dir.path()).is_empty()); - let state: serde_json::Value = serde_json::from_str( - &fs::read_to_string(dir.path().join("__cluster/state.json")).unwrap(), - ) - .unwrap(); + let state: serde_json::Value = + serde_json::from_str(&fs::read_to_string(dir.path().join("__cluster/state.json")).unwrap()) + .unwrap(); assert!( state["recovery_records"] .as_object() @@ -422,6 +421,105 @@ async fn schema_crash_before_apply_recovers_via_sweep() { scenario.teardown(); } +/// Engine apply fails after cluster preview and sidecar creation, but before +/// the graph manifest moves. The defensive cleanup proof should remove the +/// cluster sidecar immediately so a pre-movement error cannot brick boot. +#[tokio::test] +async fn schema_apply_error_before_graph_movement_removes_sidecar() { + let scenario = FailScenario::setup(); + let dir = fixture(); + converge_with_live_graph(dir.path()).await; + let pre_digest = live_schema_digest(dir.path()).await; + fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap(); + + { + let _failpoint = EngineScopedFailPoint::new("schema_apply.before_staging_write", "return"); + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "schema_apply_failed"), + "{:?}", + out.diagnostics + ); + assert_eq!(live_schema_digest(dir.path()).await, pre_digest); + assert!( + recovery_sidecars(dir.path()).is_empty(), + "{:?}", + recovery_sidecars(dir.path()) + ); + } + + let recovered = apply_config_dir(dir.path()).await; + assert!(recovered.ok && recovered.converged, "{recovered:?}"); + assert!(recovery_sidecars(dir.path()).is_empty()); + assert_ne!(live_schema_digest(dir.path()).await, pre_digest); + scenario.teardown(); +} + +/// Engine apply fails after the graph manifest moved. The cluster cannot +/// prove this is a pre-movement failure, so the sidecar must survive for +/// explicit recovery/quarantine instead of being cleaned up defensively. +#[tokio::test] +async fn schema_apply_error_after_graph_movement_keeps_sidecar() { + let scenario = FailScenario::setup(); + let dir = fixture(); + converge_with_live_graph(dir.path()).await; + let pre_digest = live_schema_digest(dir.path()).await; + fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap(); + let desired = validate_config_dir(dir.path()); + let v2_digest = desired.resource_digests["schema.knowledge"].clone(); + + { + let _failpoint = EngineScopedFailPoint::new("schema_apply.after_manifest_commit", "return"); + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "schema_apply_failed"), + "{:?}", + out.diagnostics + ); + // Read-only opens do not run engine schema-state recovery, so the + // schema file still reads as the old digest even though the manifest + // has moved. The cluster sidecar must remain because movement was + // detected by the fallback manifest-version proof. + assert_eq!(live_schema_digest(dir.path()).await, pre_digest); + let sidecars = recovery_sidecars(dir.path()); + assert_eq!(sidecars.len(), 1, "{sidecars:?}"); + let sidecar: serde_json::Value = + serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap(); + assert_eq!(sidecar["kind"], "schema_apply"); + assert!(sidecar["expected_manifest_version"].is_null(), "{sidecar}"); + } + + let uri = dir.path().join("graphs/knowledge.omni"); + let db = Omnigraph::open(uri.to_string_lossy().as_ref()) + .await + .unwrap(); + assert_eq!( + db.schema_source().as_str(), + SCHEMA_V2, + "read-write open should complete engine schema-state recovery" + ); + drop(db); + assert_eq!(live_schema_digest(dir.path()).await, v2_digest); + + let recovered = apply_config_dir(dir.path()).await; + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!( + recovered + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward") + ); + assert!(recovered.converged); + assert!(recovery_sidecars(dir.path()).is_empty()); + scenario.teardown(); +} + /// Crash after the engine schema apply, before the state CAS: the manifest /// moved, the ledger is stale, nothing acknowledged; the next run's sweep /// rolls the ledger forward with an audit entry and the run converges. @@ -447,7 +545,10 @@ async fn schema_crash_after_apply_rolls_state_forward() { assert_eq!(sidecars.len(), 1); let sidecar: serde_json::Value = serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap(); - assert!(sidecar["expected_manifest_version"].is_number(), "{sidecar}"); + assert!( + sidecar["expected_manifest_version"].is_number(), + "{sidecar}" + ); } let recovered = apply_config_dir(dir.path()).await; diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 5451b05..fbc37d2 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -1,9 +1,9 @@ pub mod api; mod handlers; mod settings; -pub use settings::{load_server_settings, classify_server_runtime_state, ServerRuntimeState}; -use settings::*; use handlers::*; +use settings::*; +pub use settings::{ServerRuntimeState, classify_server_runtime_state, load_server_settings}; pub mod auth; pub mod graph_id; pub mod identity; @@ -29,10 +29,10 @@ use api::{ BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput, BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput, CommitListQuery, ErrorCode, ErrorOutput, ExportRequest, GraphInfo, GraphListResponse, - HealthOutput, IngestOutput, IngestRequest, InvokeStoredQueryRequest, - InvokeStoredQueryResponse, QueriesCatalogOutput, QueryRequest, ReadOutput, ReadRequest, - SchemaApplyOutput, SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output, - schema_apply_output, snapshot_payload, + HealthOutput, IngestOutput, IngestRequest, InvokeStoredQueryRequest, InvokeStoredQueryResponse, + QueriesCatalogOutput, QueryRequest, ReadOutput, ReadRequest, SchemaApplyOutput, + SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output, schema_apply_output, + snapshot_payload, }; pub use auth::{AWS_SECRET_ENV, EnvOrFileTokenSource, TokenSource, resolve_token_source}; use axum::body::{Body, Bytes}; @@ -166,6 +166,10 @@ pub struct ServerConfig { /// who set up auth and forgot the policy file would otherwise ship /// the illusion of protection. pub allow_unauthenticated: bool, + /// Operator opt-in for fail-fast cluster boot. By default, graph-local + /// startup failures quarantine that graph and healthy graphs still serve. + /// When true, any quarantined or failed graph aborts startup. + pub require_all_graphs: bool, } /// What `load_server_settings` produces. RFC-011 cluster-only: the @@ -303,7 +307,14 @@ impl AppState { ) -> Self { let bearer_tokens = hash_bearer_tokens(bearer_tokens); let per_graph_policy = policy_engine.map(Arc::new); - Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, Arc::new(workload), None) + Self::build_single_mode( + uri, + db, + bearer_tokens, + per_graph_policy, + Arc::new(workload), + None, + ) } /// Like `new_single`, but attaches a pre-validated stored-query @@ -420,13 +431,8 @@ impl AppState { bearer_tokens: Vec<(String, String)>, policy_file: Option<&PathBuf>, ) -> Result { - Self::open_single_with_queries( - uri, - bearer_tokens, - policy_file, - QueryRegistry::default(), - ) - .await + Self::open_single_with_queries(uri, bearer_tokens, policy_file, QueryRegistry::default()) + .await } /// Single-mode boot with a stored-query registry: open the engine, @@ -509,8 +515,7 @@ impl AppState { // reserved id `default` — both the registry key and the URL // segment (`/graphs/default/...`). let uri = normalize_root_uri(&uri).unwrap_or(uri); - let graph_id = - GraphId::try_from("default").expect("'default' is a valid GraphId"); + let graph_id = GraphId::try_from("default").expect("'default' is a valid GraphId"); let key = GraphKey::cluster(graph_id); let handle = Arc::new(GraphHandle { key, @@ -889,15 +894,21 @@ pub fn build_app(state: AppState) -> Router { // flagged and their responses include RFC 9745 Deprecation + // RFC 8288 Link headers. Suppress the call-site warning for the // route registration itself. - .route("/read", post({ - #[allow(deprecated)] - server_read - })) + .route( + "/read", + post({ + #[allow(deprecated)] + server_read + }), + ) .route("/query", post(server_query)) - .route("/change", post({ - #[allow(deprecated)] - server_change - })) + .route( + "/change", + post({ + #[allow(deprecated)] + server_change + }), + ) .route("/mutate", post(server_mutate)) .route("/queries", get(server_list_queries)) .route("/queries/{name}", post(server_invoke_query)) @@ -1013,7 +1024,14 @@ pub async fn serve(config: ServerConfig) -> Result<()> { config = %config_path.display(), "serving omnigraph" ); - open_multi_graph_state(graphs, tokens, server_policy.as_ref(), config_path).await? + open_multi_graph_state( + graphs, + tokens, + server_policy.as_ref(), + config_path, + config.require_all_graphs, + ) + .await? } }; @@ -1033,9 +1051,9 @@ fn load_graph_policy(source: &PolicySource, graph_id: &str) -> Result, server_policy_source: Option<&PolicySource>, config_path: PathBuf, + require_all_graphs: bool, ) -> Result { - use futures::{StreamExt, TryStreamExt}; + use futures::StreamExt; if graphs.is_empty() { bail!("multi-graph mode requires at least one graph in the `graphs:` map"); @@ -1058,21 +1077,48 @@ pub async fn open_multi_graph_state( // `Omnigraph::Server::"root"` entity at evaluation time. let server_policy = match server_policy_source { Some(PolicySource::File(path)) => Some(PolicyEngine::load_server(path)?), - Some(PolicySource::Inline(source)) => { - Some(PolicyEngine::load_server_from_source(source)?) - } + Some(PolicySource::Inline(source)) => Some(PolicyEngine::load_server_from_source(source)?), None => None, }; - // `try_collect` propagates the first error eagerly, dropping every - // in-flight open. `buffer_unordered + collect::>` would drain - // the stream before checking errors — incorrect for the docstring's - // "fail-fast" claim and wasteful on S3-backed graphs. - let handles: Vec> = futures::stream::iter(graphs.into_iter()) - .map(|cfg| async move { open_single_graph(cfg).await }) + let configured_graphs = graphs.len(); + let results = futures::stream::iter(graphs.into_iter()) + .map(|cfg| async move { + let graph_id = cfg.graph_id.clone(); + open_single_graph(cfg).await.map_err(|err| (graph_id, err)) + }) .buffer_unordered(4) - .try_collect() - .await?; + .collect::>() + .await; + let mut handles = Vec::new(); + let mut failed = 0usize; + for result in results { + match result { + Ok(handle) => handles.push(handle), + Err((graph_id, err)) => { + failed += 1; + warn!( + graph_id = %graph_id, + error = %err, + "graph quarantined during startup" + ); + } + } + } + if require_all_graphs && failed > 0 { + bail!( + "strict multi-graph startup requires every graph to open ({} configured, {} failed)", + configured_graphs, + failed + ); + } + if handles.is_empty() { + bail!( + "no healthy graphs opened from multi-graph startup config ({} configured, {} failed)", + configured_graphs, + failed + ); + } let workload = workload::WorkloadController::from_env(); let state = AppState::new_multi(handles, tokens, server_policy, workload, Some(config_path)) diff --git a/crates/omnigraph-server/src/main.rs b/crates/omnigraph-server/src/main.rs index 482c9af..c45b77f 100644 --- a/crates/omnigraph-server/src/main.rs +++ b/crates/omnigraph-server/src/main.rs @@ -22,6 +22,11 @@ struct Cli { /// Equivalent to setting `OMNIGRAPH_UNAUTHENTICATED=1`. #[arg(long)] unauthenticated: bool, + /// Fail startup if any applied graph is quarantined or fails to open. + /// By default, graph-local failures are logged and healthy graphs still + /// serve. Equivalent to setting `OMNIGRAPH_REQUIRE_ALL_GRAPHS=1`. + #[arg(long)] + require_all_graphs: bool, } #[tokio::main] @@ -30,7 +35,12 @@ async fn main() -> Result<()> { init_tracing(); let cli = Cli::parse(); - let settings: ServerConfig = - load_server_settings(cli.cluster.as_ref(), cli.bind, cli.unauthenticated).await?; + let settings: ServerConfig = load_server_settings( + cli.cluster.as_ref(), + cli.bind, + cli.unauthenticated, + cli.require_all_graphs, + ) + .await?; serve(settings).await } diff --git a/crates/omnigraph-server/src/settings.rs b/crates/omnigraph-server/src/settings.rs index bb6febd..ae28205 100644 --- a/crates/omnigraph-server/src/settings.rs +++ b/crates/omnigraph-server/src/settings.rs @@ -12,6 +12,7 @@ pub(crate) async fn load_cluster_settings( cluster_dir: &PathBuf, cli_bind: Option, cli_allow_unauthenticated: bool, + cli_require_all_graphs: bool, ) -> Result { // `--cluster` accepts either a config directory (the ledger location is // resolved through cluster.yaml's `storage:` key) or a storage-root URI @@ -28,11 +29,45 @@ pub(crate) async fn load_cluster_settings( .map_err(|diagnostics| { let details = diagnostics .iter() - .map(|diagnostic| format!("[{}] {}: {}", diagnostic.code, diagnostic.path, diagnostic.message)) + .map(|diagnostic| { + format!( + "[{}] {}: {}", + diagnostic.code, diagnostic.path, diagnostic.message + ) + }) .collect::>() .join("\n "); - eyre!("the cluster at '{}' is not ready to serve:\n {details}", cluster_dir.display()) + eyre!( + "the cluster at '{}' is not ready to serve:\n {details}", + cluster_dir.display() + ) })?; + for diagnostic in &snapshot.diagnostics { + warn!( + code = %diagnostic.code, + path = %diagnostic.path, + message = %diagnostic.message, + "cluster startup diagnostic" + ); + } + let env_require_all_graphs = env_flag("OMNIGRAPH_REQUIRE_ALL_GRAPHS"); + let require_all_graphs = cli_require_all_graphs || env_require_all_graphs; + if require_all_graphs && !snapshot.diagnostics.is_empty() { + let details = snapshot + .diagnostics + .iter() + .map(|diagnostic| { + format!( + "[{}] {}: {}", + diagnostic.code, diagnostic.path, diagnostic.message + ) + }) + .collect::>() + .join("\n "); + bail!( + "strict cluster boot requires every applied graph to be ready; startup diagnostics:\n {details}" + ); + } // Bindings -> Cedar slots. The serving pipeline loads one bundle per // graph plus one server-level bundle; stacked bundles per scope are a @@ -69,6 +104,7 @@ pub(crate) async fn load_cluster_settings( } let mut graphs = Vec::new(); + let mut skipped_graphs = Vec::new(); for graph in &snapshot.graphs { let specs: Vec = snapshot .queries @@ -84,40 +120,75 @@ pub(crate) async fn load_cluster_settings( tool_name: None, }) .collect(); - let registry = QueryRegistry::from_specs(specs).map_err(|errors| { - let details = errors - .iter() - .map(|error| error.to_string()) - .collect::>() - .join("\n "); - eyre!( - "stored queries in the applied revision failed to parse:\n {details}\nrun `cluster refresh` then `cluster apply`, and restart" - ) - })?; + let registry = match QueryRegistry::from_specs(specs) { + Ok(registry) => registry, + Err(errors) => { + let details = errors + .iter() + .map(|error| error.to_string()) + .collect::>() + .join("\n "); + warn!( + graph_id = %graph.graph_id, + errors = %details, + "graph quarantined because stored queries failed to parse" + ); + skipped_graphs.push(format!( + "{}: stored queries failed to parse: {details}", + graph.graph_id + )); + continue; + } + }; + let embedding = match graph + .embedding + .as_ref() + .map(|profile| { + profile.resolve().map_err(|err| { + eyre!("embedding provider for graph '{}': {err}", graph.graph_id) + }) + }) + .transpose() + { + Ok(embedding) => embedding, + Err(err) => { + warn!( + graph_id = %graph.graph_id, + error = %err, + "graph quarantined because embedding provider configuration failed" + ); + skipped_graphs.push(format!("{}: {err}", graph.graph_id)); + continue; + } + }; graphs.push(GraphStartupConfig { graph_id: graph.graph_id.clone(), uri: graph.root.to_string_lossy().to_string(), policy: graph_policies.get(&graph.graph_id).cloned(), - embedding: graph - .embedding - .as_ref() - .map(|profile| { - profile.resolve().map_err(|err| { - eyre!("embedding provider for graph '{}': {err}", graph.graph_id) - }) - }) - .transpose()?, + embedding, queries: registry, }); } + if graphs.is_empty() { + let skipped = skipped_graphs.join(", "); + bail!( + "the cluster at '{}' has no healthy graphs to serve{}", + cluster_dir.display(), + if skipped.is_empty() { + String::new() + } else { + format!(" (quarantined: {skipped})") + } + ); + } + if require_all_graphs && !skipped_graphs.is_empty() { + bail!( + "strict cluster boot requires every graph to build startup settings (quarantined: {})", + skipped_graphs.join(", ") + ); + } - let env_unauth = std::env::var("OMNIGRAPH_UNAUTHENTICATED") - .ok() - .map(|v| { - let trimmed = v.trim(); - !trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false") - }) - .unwrap_or(false); + let env_unauth = env_flag("OMNIGRAPH_UNAUTHENTICATED"); Ok(ServerConfig { mode: ServerConfigMode::Multi { @@ -127,6 +198,7 @@ pub(crate) async fn load_cluster_settings( }, bind: cli_bind.unwrap_or_else(|| "127.0.0.1:8080".to_string()), allow_unauthenticated: cli_allow_unauthenticated || env_unauth, + require_all_graphs, }) } @@ -138,6 +210,7 @@ pub async fn load_server_settings( cli_cluster: Option<&PathBuf>, cli_bind: Option, cli_allow_unauthenticated: bool, + cli_require_all_graphs: bool, ) -> Result { let Some(cluster_dir) = cli_cluster else { bail!( @@ -147,7 +220,23 @@ pub async fn load_server_settings( was removed in RFC-011." ); }; - load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated).await + load_cluster_settings( + cluster_dir, + cli_bind, + cli_allow_unauthenticated, + cli_require_all_graphs, + ) + .await +} + +fn env_flag(name: &str) -> bool { + std::env::var(name) + .ok() + .map(|v| { + let trimmed = v.trim(); + !trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false") + }) + .unwrap_or(false) } /// MR-723 server runtime state, classified from the three-state matrix @@ -240,7 +329,9 @@ pub(crate) fn read_bearer_tokens_file(path: &str) -> Result) -> Result> { +pub(crate) fn validate_bearer_tokens( + entries: Vec<(String, String)>, +) -> Result> { let mut seen_actors = HashSet::new(); let mut seen_tokens = HashSet::new(); let mut normalized = Vec::with_capacity(entries.len()); @@ -301,11 +392,18 @@ mod tests { /// as 404 without also masking a 401/500. Pins each outcome. #[test] fn authorize_splits_decision_from_operational_error() { - use super::{Authz, PolicyAction, PolicyCompiler, PolicyConfig, PolicyRequest, ResolvedActor, authorize}; + use super::{ + Authz, PolicyAction, PolicyCompiler, PolicyConfig, PolicyRequest, ResolvedActor, + authorize, + }; use std::sync::Arc; fn req(action: PolicyAction) -> PolicyRequest { - PolicyRequest { action, branch: None, target_branch: None } + PolicyRequest { + action, + branch: None, + target_branch: None, + } } let actor = ResolvedActor::cluster_static(Arc::from("act-alice")); @@ -345,7 +443,11 @@ mod tests { authorize( Some(&actor), Some(&engine), - PolicyRequest { action: PolicyAction::Read, branch: Some("main".to_string()), target_branch: None }, + PolicyRequest { + action: PolicyAction::Read, + branch: Some("main".to_string()), + target_branch: None + }, ) .unwrap(), Authz::Allowed @@ -354,11 +456,17 @@ mod tests { match authorize( Some(&actor), Some(&engine), - PolicyRequest { action: PolicyAction::Change, branch: Some("main".to_string()), target_branch: None }, + PolicyRequest { + action: PolicyAction::Change, + branch: Some("main".to_string()), + target_branch: None, + }, ) .unwrap() { - Authz::Denied(message) => assert!(!message.is_empty(), "a deny carries its decision message"), + Authz::Denied(message) => { + assert!(!message.is_empty(), "a deny carries its decision message") + } Authz::Allowed => panic!("change must be denied: only read is allowed"), } // Policy installed but no actor → operational failure (`Err`), NOT a @@ -397,8 +505,7 @@ mod tests { }; // Empty registry → nothing attached, no error. - let empty = - super::validate_and_attach(QueryRegistry::default(), &catalog, "g").unwrap(); + let empty = super::validate_and_attach(QueryRegistry::default(), &catalog, "g").unwrap(); assert!(empty.is_none()); // A query that type-checks → attached. @@ -407,7 +514,11 @@ mod tests { "query find_user() { match { $u: User } return { $u.name } }", )]) .unwrap(); - assert!(super::validate_and_attach(ok, &catalog, "g").unwrap().is_some()); + assert!( + super::validate_and_attach(ok, &catalog, "g") + .unwrap() + .is_some() + ); // A query referencing a type the schema lacks → boot refusal that // names both the graph label and the offending query. @@ -420,7 +531,10 @@ mod tests { let msg = err.to_string(); assert!(msg.contains("graph-x"), "labels the graph: {msg}"); assert!(msg.contains("ghost"), "names the query: {msg}"); - assert!(msg.contains("schema check"), "mentions the schema check: {msg}"); + assert!( + msg.contains("schema check"), + "mentions the schema check: {msg}" + ); } #[test] @@ -451,7 +565,7 @@ mod tests { async fn server_settings_require_cluster_boot_source() { // RFC-011 cluster-only: with no --cluster the server refuses to // start and names the cluster-required remedy. - let error = super::load_server_settings(None, None, false) + let error = super::load_server_settings(None, None, false, false) .await .unwrap_err(); assert!( @@ -534,6 +648,7 @@ mod tests { }, bind: "127.0.0.1:0".to_string(), allow_unauthenticated: false, + require_all_graphs: false, }; let result = serve(config).await; let err = result @@ -586,6 +701,7 @@ mod tests { }, bind: "127.0.0.1:0".to_string(), allow_unauthenticated: false, + require_all_graphs: false, }; let result = serve(config).await; let err = diff --git a/crates/omnigraph-server/tests/multi_graph.rs b/crates/omnigraph-server/tests/multi_graph.rs index 617cc66..5679aef 100644 --- a/crates/omnigraph-server/tests/multi_graph.rs +++ b/crates/omnigraph-server/tests/multi_graph.rs @@ -13,7 +13,6 @@ use serde_json::Value; use serial_test::serial; use tower::ServiceExt; - mod support; use support::*; @@ -414,7 +413,7 @@ async fn cluster_boot_serves_applied_state() { assert!(server_policy.is_none()); let state = - omnigraph_server::open_multi_graph_state(graphs, Vec::new(), None, config_path) + omnigraph_server::open_multi_graph_state(graphs, Vec::new(), None, config_path, false) .await .unwrap(); let app = build_app(state); @@ -424,7 +423,10 @@ async fn cluster_boot_serves_applied_state() { // GET /graphs refuses even in cluster mode. let (status, body) = json_response( &app, - Request::builder().uri("/graphs").body(Body::empty()).unwrap(), + Request::builder() + .uri("/graphs") + .body(Body::empty()) + .unwrap(), ) .await; assert_eq!(status, StatusCode::FORBIDDEN, "{body}"); @@ -460,6 +462,115 @@ async fn cluster_boot_serves_applied_state() { assert_eq!(status, StatusCode::OK, "{body}"); } +#[tokio::test] +async fn cluster_boot_quarantines_graph_open_failures() { + let temp = tempfile::tempdir().unwrap(); + let schema = "\nnode Person {\n name: String @key\n}\n"; + let good_uri = temp.path().join("good.omni"); + Omnigraph::init(good_uri.to_string_lossy().as_ref(), schema) + .await + .unwrap(); + let bad_uri = temp.path().join("missing.omni"); + let server_policy = omnigraph_server::PolicySource::Inline( + r#" +version: 1 +kind: server +groups: + admins: [act-admin] +rules: + - id: admins-list-graphs + allow: + actors: { group: admins } + actions: [graph_list] +"# + .to_string(), + ); + let graphs = vec![ + omnigraph_server::GraphStartupConfig { + graph_id: "broken".to_string(), + uri: bad_uri.to_string_lossy().to_string(), + policy: None, + embedding: None, + queries: stored_query_registry(&[]), + }, + omnigraph_server::GraphStartupConfig { + graph_id: "good".to_string(), + uri: good_uri.to_string_lossy().to_string(), + policy: None, + embedding: None, + queries: stored_query_registry(&[]), + }, + ]; + let strict_err = match omnigraph_server::open_multi_graph_state( + graphs.clone(), + vec![("act-admin".to_string(), "admin-token".to_string())], + Some(&server_policy), + temp.path().join("cluster.yaml"), + true, + ) + .await + { + Ok(_) => panic!("strict startup should reject a failed graph open"), + Err(err) => err, + }; + assert!( + strict_err + .to_string() + .contains("strict multi-graph startup requires every graph to open"), + "{strict_err}" + ); + let state = omnigraph_server::open_multi_graph_state( + graphs, + vec![("act-admin".to_string(), "admin-token".to_string())], + Some(&server_policy), + temp.path().join("cluster.yaml"), + false, + ) + .await + .unwrap(); + let mut ready: Vec<_> = state + .routing() + .registry + .list() + .iter() + .map(|handle| handle.key.graph_id.as_str().to_string()) + .collect(); + ready.sort(); + assert_eq!(ready, vec!["good"]); + let app = build_app(state); + + let (status, body) = json_response( + &app, + Request::builder() + .uri("/graphs") + .header("authorization", "Bearer admin-token") + .body(Body::empty()) + .unwrap(), + ) + .await; + assert_eq!(status, StatusCode::OK, "{body}"); + assert_eq!( + body["graphs"] + .as_array() + .unwrap() + .iter() + .map(|graph| graph["graph_id"].as_str().unwrap()) + .collect::>(), + vec!["good"] + ); + + let (status, body) = json_response( + &app, + Request::builder() + .uri("/graphs/broken/queries") + .header("authorization", "Bearer admin-token") + .body(Body::empty()) + .unwrap(), + ) + .await; + assert_eq!(status, StatusCode::NOT_FOUND, "{body}"); +} + #[tokio::test(flavor = "multi_thread")] #[serial] async fn cluster_boot_injects_embedding_provider_config() { @@ -555,6 +666,7 @@ graphs: Vec::new(), server_policy.as_ref(), config_path, + false, ) .await .unwrap(); @@ -665,7 +777,10 @@ async fn cluster_boot_wires_policy_bindings_into_cedar_slots() { .unwrap(); fs::write( temp.path().join("cluster.policy.yaml"), - permit_all_policy_yaml(&["default"]).replace("protected_branches: [main]\n", "protected_branches: [main]\nkind: server\n"), + permit_all_policy_yaml(&["default"]).replace( + "protected_branches: [main]\n", + "protected_branches: [main]\nkind: server\n", + ), ) .unwrap(); fs::write( @@ -719,7 +834,7 @@ graphs: async fn cluster_boot_refusals() { // RFC-011 cluster-only: with no --cluster, boot refuses with the // cluster-required remedy. - let err = omnigraph_server::load_server_settings(None, None, true) + let err = omnigraph_server::load_server_settings(None, None, true, false) .await .unwrap_err(); assert!(err.to_string().contains("boots from a cluster"), "{err}"); @@ -729,7 +844,12 @@ async fn cluster_boot_refusals() { // Tampered catalog blob refuses boot with the remedy. let blob_dir = dir.join("__cluster/resources/query/knowledge/find_person"); - let blob = fs::read_dir(&blob_dir).unwrap().next().unwrap().unwrap().path(); + let blob = fs::read_dir(&blob_dir) + .unwrap() + .next() + .unwrap() + .unwrap() + .path(); fs::write(&blob, "tampered").unwrap(); let err = cluster_settings(&dir).await.unwrap_err(); assert!( diff --git a/crates/omnigraph-server/tests/s3.rs b/crates/omnigraph-server/tests/s3.rs index 99bf98d..793d79d 100644 --- a/crates/omnigraph-server/tests/s3.rs +++ b/crates/omnigraph-server/tests/s3.rs @@ -11,7 +11,6 @@ use omnigraph_server::api::ReadRequest; use omnigraph_server::{AppState, build_app}; use serde_json::json; - mod support; use support::*; @@ -137,6 +136,7 @@ async fn server_boots_cluster_from_bare_storage_uri_and_serves_query() { Some(&std::path::PathBuf::from(&root)), None, true, + false, ) .await .unwrap(); @@ -153,6 +153,7 @@ async fn server_boots_cluster_from_bare_storage_uri_and_serves_query() { Vec::new(), server_policy.as_ref(), config_path, + false, ) .await .unwrap(); @@ -170,7 +171,9 @@ async fn server_boots_cluster_from_bare_storage_uri_and_serves_query() { .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); - let bytes = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap(); + let bytes = axum::body::to_bytes(response.into_body(), usize::MAX) + .await + .unwrap(); let value: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); assert_eq!(value["rows"][0]["p.name"], "Ada", "{value}"); } diff --git a/crates/omnigraph-server/tests/support/mod.rs b/crates/omnigraph-server/tests/support/mod.rs index 157c58e..694db46 100644 --- a/crates/omnigraph-server/tests/support/mod.rs +++ b/crates/omnigraph-server/tests/support/mod.rs @@ -15,15 +15,12 @@ use omnigraph::db::{Omnigraph, ReadTarget}; use omnigraph::error::OmniError; use omnigraph::loader::{LoadMode, load_jsonl}; use omnigraph_policy::{PolicyChecker, PolicyEngine}; -use omnigraph_server::api::{ - BranchCreateRequest, BranchMergeRequest, ChangeRequest, ReadRequest, -}; +use omnigraph_server::api::{BranchCreateRequest, BranchMergeRequest, ChangeRequest, ReadRequest}; use omnigraph_server::queries::{QueryRegistry, RegistrySpec}; use omnigraph_server::{AppState, build_app}; use serde_json::{Value, json}; use tower::ServiceExt; - pub const MUTATION_QUERIES: &str = r#" query insert_person($name: String, $age: I32) { insert Person { name: $name, age: $age } @@ -1198,6 +1195,8 @@ graphs: temp } -pub async fn cluster_settings(dir: &Path) -> color_eyre::eyre::Result { - omnigraph_server::load_server_settings(Some(&dir.to_path_buf()), None, true).await +pub async fn cluster_settings( + dir: &Path, +) -> color_eyre::eyre::Result { + omnigraph_server::load_server_settings(Some(&dir.to_path_buf()), None, true, false).await } diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 98587aa..79d7de7 100644 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -17,7 +17,12 @@ if [ -n "${OMNIGRAPH_CLUSTER:-}" ]; then echo "OMNIGRAPH_CLUSTER is an exclusive boot source; unset OMNIGRAPH_TARGET_URI/OMNIGRAPH_CONFIG/OMNIGRAPH_TARGET" >&2 exit 64 fi - exec "$SERVER_BIN" --cluster "${OMNIGRAPH_CLUSTER}" --bind "${bind}" + set -- --cluster "${OMNIGRAPH_CLUSTER}" --bind "${bind}" + case "${OMNIGRAPH_REQUIRE_ALL_GRAPHS:-}" in + ""|0|false|FALSE) ;; + *) set -- "$@" --require-all-graphs ;; + esac + exec "$SERVER_BIN" "$@" fi # URI comes from the env var (the positional arg wins over any config @@ -46,6 +51,8 @@ omnigraph-server container startup requires one of: Optional: - OMNIGRAPH_BIND (default: 0.0.0.0:8080) + - OMNIGRAPH_REQUIRE_ALL_GRAPHS (cluster mode: fail startup unless every + applied graph is healthy) - OMNIGRAPH_TARGET (used with OMNIGRAPH_CONFIG) - OMNIGRAPH_CONFIG (may also accompany OMNIGRAPH_TARGET_URI to add a policy file; the URI still comes from OMNIGRAPH_TARGET_URI) diff --git a/docs/dev/rfc-005-server-cluster-boot.md b/docs/dev/rfc-005-server-cluster-boot.md index 85df875..6c57bba 100644 --- a/docs/dev/rfc-005-server-cluster-boot.md +++ b/docs/dev/rfc-005-server-cluster-boot.md @@ -1,7 +1,7 @@ # RFC: Server Boots from Cluster State — Phase 5 of the Cluster Control Plane **Status:** Landed (5A policy bindings #175; 5B/5C the `--cluster` boot mode — one PR) -**Implementation deviations:** (1) cluster mode reuses `ServerConfigMode::Multi` (a new settings *source*, not a new enum variant; `config_path` carries the cluster dir). (2) Stored queries load via `QueryRegistry::from_specs` from verified blob *content*, not blob paths. (3) More than one policy bundle binding a single scope is a boot error (the serving pipeline holds one bundle per graph + one server-level; stacking is a later slice). (4) `GET /graphs` keeps its closed-by-default contract — without a cluster-bound bundle there is no server-level Cedar engine, so enumeration refuses. +**Implementation deviations:** (1) cluster mode reuses `ServerConfigMode::Multi` (a new settings *source*, not a new enum variant; `config_path` carries the cluster dir). (2) Stored queries load via `QueryRegistry::from_specs` from verified blob *content*, not blob paths. (3) More than one policy bundle binding a single scope is a boot error (the serving pipeline holds one bundle per graph + one server-level; stacking is a later slice). (4) `GET /graphs` keeps its closed-by-default contract — without a cluster-bound bundle there is no server-level Cedar engine, so enumeration refuses. (5) Graph-attributed startup failures quarantine that graph by default; operators can restore all-or-nothing boot with `--require-all-graphs` / `OMNIGRAPH_REQUIRE_ALL_GRAPHS=1`. **Date:** 2026-06-10 **Builds on:** Phase 4 complete ([rfc-004-cluster-graph-schema-apply.md](rfc-004-cluster-graph-schema-apply.md), Landed): `cluster apply` converges graphs, schemas, stored queries, and policies into the cluster catalog. Normative context: [cluster-config-specs.md](cluster-config-specs.md) (the migration model's "window 2"), [cluster-axioms.md](cluster-axioms.md) (axiom 15), [cluster-config-implementation-spec.md](cluster-config-implementation-spec.md) (Phase 5 rollout, Compatibility Stance #7–#9, exit criterion 7). **Target release:** unversioned (phased — see Sequencing). @@ -46,8 +46,8 @@ Mode inference gains rule 0: `--cluster ` → **Cluster mode**, which is al `load_server_settings` grows a cluster branch that reads, in order: -1. `__cluster/state.json` — **missing state is a boot error** ("run `cluster import` + `cluster apply` first"). Pending recovery sidecars under `__cluster/recoveries/` are also a boot error (`cluster_recovery_pending`): a server must not start serving a ledger that a sweep is about to rewrite. -2. **Graph set** = state's `graph.` resources (tombstoned graphs are absent by construction). Each graph's URI is the derived root `/graphs/.omni`. A recorded graph whose root does not open is a boot error — same fail-fast posture as today's bad URI. +1. `__cluster/state.json` — **missing state is a boot error** ("run `cluster import` + `cluster apply` first"). Invalid or unattributable recovery sidecars under `__cluster/recoveries/` are also a boot error: a server must not start if it cannot prove the blast radius. Valid graph-attributed sidecars quarantine that graph by default and are logged as `cluster_recovery_pending`; `--require-all-graphs` promotes them back to a boot error. +2. **Graph set** = state's `graph.` resources (tombstoned graphs are absent by construction). Each graph's URI is the derived root `/graphs/.omni`. A recorded graph whose root does not open quarantines that graph by default; `--require-all-graphs` restores the original fail-fast posture. 3. **Stored queries** = state's `query..` entries, content loaded from the catalog blob at the recorded digest. Blob-missing or digest-mismatched is a boot error (the catalog verification semantics from Stage 3B, applied at boot). Queries type-check at engine open exactly as today (`validate_and_attach` — unchanged). 4. **Policies** = state's `policy.` entries, content from catalog blobs, bindings from the applied metadata of D3: bundles bound to `cluster` load as the server-level Cedar engine (`PolicyEngine::load_server`); bundles bound to graphs load per-graph (`PolicyEngine::load_graph`) and install via `with_policy` — the existing two-gate structure, unchanged. 5. `cluster.yaml` is parsed **only** to validate that the directory is a cluster root (and for nothing else — explicitly not for resource content; a divergence between desired config and applied state is *served as applied*, visible via `cluster plan`). @@ -76,16 +76,19 @@ State's `StateResource` records only a digest. To make the ledger serving-suffic ### D4. Readiness and failure posture -Boot is fail-fast, matching the server's existing stance (bad policy YAML refuses boot): +Cluster-global failures are fail-fast, matching the server's existing stance (bad policy YAML refuses boot). Graph-local failures quarantine the affected graph by default so a single bad graph cannot crash-loop an otherwise healthy cluster. Operators who prefer the original all-or-nothing contract pass `--require-all-graphs` or set `OMNIGRAPH_REQUIRE_ALL_GRAPHS=1`, which promotes every graph-local quarantine/open/settings failure to a boot error. | Condition | Behavior | |---|---| | `state.json` missing / unparseable / unsupported version | boot error | -| pending recovery sidecars | boot error (run any state-mutating cluster command to sweep) | -| recorded graph root missing or unopenable | boot error | +| invalid/unreadable/unattributable recovery sidecars | boot error (run any state-mutating cluster command to sweep or inspect) | +| valid graph-attributed recovery sidecars | quarantine that graph; strict mode boot error | +| recorded graph root missing or unopenable | quarantine that graph; strict mode boot error | | query/policy blob missing or digest-mismatched | boot error (run `cluster refresh` + `apply` to self-heal, then restart) | | policy entry without `applies_to` metadata | boot error ("re-run cluster apply", D3) | -| stored query fails type-check against the live schema | boot error (existing `validate_and_attach` behavior) | +| stored query fails parse/type-check against the live schema | quarantine that graph; strict mode boot error | +| embedding provider configuration for one graph cannot resolve | quarantine that graph; strict mode boot error | +| every applied graph is quarantined or fails startup | boot error (`cluster_no_healthy_graphs`) | | state lock held | **not** an error — boot takes no lock; it reads a point-in-time snapshot of an immutable-once-written state file (the CAS discipline means a concurrent apply produces a *new* file atomically; the server reads whichever was current at open) | ### D5. The `mcp.expose` bridge in cluster mode @@ -109,7 +112,7 @@ Rollback is the same switch in reverse — nothing in cluster mode mutates `omni - *Axiom 5*: the server serves deployed reality (applied digests), never desired intent; D3 keeps the ledger the single serving source. - *Axiom 12*: boot reads without the lock but relies on the atomic-replace write discipline; it never writes state. - *Axiom 14 / Stance #9*: the expose-all bridge is named, scoped to cluster mode, and carries its Phase 6 sunset. -- *Loud failures (deny-list)*: every degraded condition is a typed boot error with a remedy; no partial serving, no silent fallback to the yaml. +- *Loud failures (deny-list)*: every degraded condition is either a typed cluster-global boot error with a remedy or an explicit graph quarantine logged at startup; no silent fallback to the yaml. `--require-all-graphs` is the opt-in all-or-nothing mode for operators who treat any degraded graph as fatal. - *Respect the boundaries*: `omnigraph-cluster` stays free of HTTP; the server reads the catalog through a small read-only loader (either a `pub` read surface on `omnigraph-cluster` or a thin module in the server consuming the documented file formats — implementation picks the one that keeps `omnigraph-cluster` dependency-light; the state/blob formats are already a documented contract). ## Sequencing @@ -117,7 +120,7 @@ Rollback is the same switch in reverse — nothing in cluster mode mutates `omni | Slice | Scope | Gate | |---|---|---| | **5A: serving metadata in state** | `applies_to` recorded on policy resources at apply + sweep roll-forward; additive state schema; `status`/plan surfacing | In-crate tests: metadata written/rolled-forward; old state parses; re-apply backfills | -| **5B: `--cluster` boot mode** | Flag + mode inference rule 0; catalog loader (state → `GraphStartupConfig`s + registries + policy engines); readiness table; OpenAPI regen if surface shifts | Server tests: boot from a converged fixture dir, serve `/graphs/{id}/query` + stored queries + Cedar gates; every D4 row refuses boot; e2e: `cluster apply` then serve — "applied means serving" | +| **5B: `--cluster` boot mode** | Flag + mode inference rule 0; catalog loader (state → `GraphStartupConfig`s + registries + policy engines); readiness table; OpenAPI regen if surface shifts | Server tests: boot from a converged fixture dir, serve `/graphs/{id}/query` + stored queries + Cedar gates; D4 cluster-global rows refuse boot; graph-local rows quarantine by default and refuse under `--require-all-graphs`; e2e: `cluster apply` then serve — "applied means serving" | | **5C: docs + caveat retirement** | `cluster-config.md` mode-switch section; `server.md`/`deployment.md`; retire the "not serving" caveats for cluster-mode deployments; migration guide (D6) | `check-agents-md.sh`; doc accuracy review | ## Exit-criteria coverage diff --git a/docs/releases/v0.7.0.md b/docs/releases/v0.7.0.md index b4ad903..24cefdf 100644 --- a/docs/releases/v0.7.0.md +++ b/docs/releases/v0.7.0.md @@ -36,6 +36,12 @@ get faster and self-healing, and text embedding becomes provider-independent. single-graph flat-route mode, positional-`` boot, and `omnigraph.yaml` `graphs:`-map boot are gone — add or remove graphs with `cluster apply` and restart. +- **Resilient cluster boot with strict opt-out.** Graph-attributed startup + failures now quarantine that graph and let healthy graphs serve; `/graphs` + lists only ready graphs, and quarantined graph routes return 404. Cluster- + global failures still refuse boot, and `--require-all-graphs` (or + `OMNIGRAPH_REQUIRE_ALL_GRAPHS=1`) restores fail-fast all-or-nothing startup + for operators who prefer any degraded graph to abort the process. - **One storage substrate + recovery liveness.** The cluster storage backend and the engine both go through one `StorageAdapter` (versioned read, conditional replace/CAS, prefix delete), exercised by a storage fault-injection matrix. diff --git a/docs/user/clusters/config.md b/docs/user/clusters/config.md index cd4d772..04811ec 100644 --- a/docs/user/clusters/config.md +++ b/docs/user/clusters/config.md @@ -231,9 +231,11 @@ Policy entries additionally record their applied `applies_to` bindings as normalized typed refs — the state ledger is serving-sufficient for the future server-boot stage. A change to `applies_to` alone (the policy file digest unchanged) appears in the plan as an Update marked `binding_change` -(human output: `[bindings]`), applies like any catalog change, and counts -toward convergence; ledgers written before this field existed are backfilled -by the next apply. +(human output: `[bindings]`), and as `metadata_change: policy_bindings` in +structured output. Embedding provider entries similarly carry their resolved +profile in the ledger; pre-profile ledgers are backfilled by an Update with +`metadata_change: embedding_profile`. These metadata-only updates apply like +catalog changes and count toward convergence. Each plan change carries a `disposition` field — an honest preview of what `cluster apply` will do with it in this stage: `applied` (executes), `derived` @@ -322,7 +324,9 @@ cluster apply until the approval-artifact stage. Unsupported migrations (e.g. changing a property's type), engine lock contention, or graphs with user branches fail loudly as `schema_apply_failed` with the engine's message; dependent changes are demoted to `blocked` and graph-moving work stops for -the run. +the run. These pre-movement failures are checked before the cluster schema +recovery sidecar is created, so they do not leave stale recovery files behind +or brick later server boot. `cluster plan` previews schema updates with the engine's real migration plan: each schema change carries a `migration` field (`supported` + typed steps), @@ -402,20 +406,29 @@ drift is visible. Routing is always multi-graph (`/graphs/{id}/...`). Bearer tokens and the bind address stay process-level (flags/env) — they are per-replica facts, not cluster facts. -Boot is fail-fast: missing or unreadable state, pending recovery sidecars, -missing/tampered catalog blobs, policy entries without binding metadata -(pre-binding ledgers — re-run `cluster apply`), an empty graph set, more than -one policy bundle binding a single scope (split or merge bundles; stacked -scopes are a later stage), unopenable graph roots, and stored queries that no -longer type-check all refuse startup with a remedy. A held state lock is -*not* an error — boot reads the atomically-replaced state file without +Boot is fail-fast for cluster-global readiness failures: missing or +unreadable state, invalid/unattributable recovery sidecars, +missing/tampered shared catalog blobs, policy entries without binding +metadata (pre-binding ledgers — re-run `cluster apply`), an empty graph set, +more than one policy bundle binding a single scope (split or merge bundles; +stacked scopes are a later stage), cluster policy problems, or zero healthy +graphs. Valid graph-attributed recovery sidecars, unopenable graph roots, and +stored queries that no longer type-check quarantine that graph instead; the +server logs startup diagnostics, skips the graph's queries and graph-only +policy bindings, and serves any remaining healthy graphs. A held state lock +is *not* an error — boot reads the atomically-replaced state file without locking. +Use `omnigraph-server --require-all-graphs` (or +`OMNIGRAPH_REQUIRE_ALL_GRAPHS=1`) when degraded serving is not acceptable; it +promotes every graph-local quarantine or startup failure back to a boot error. + Serving is static per process: the server reads the applied revision once at -startup, so picking up newly applied state means restarting it. Stored -queries are all listed in `GET /queries` in cluster mode (the cluster -registry has no expose flag; exposure becomes a policy decision in a later -phase). +startup, so picking up newly applied state means restarting it. `GET /graphs` +lists only ready/served graphs; quarantined graphs are omitted and their +routes return 404. Stored queries are all listed in `GET /queries` in cluster +mode (the cluster registry has no expose flag; exposure becomes a policy +decision in a later phase). ## Status diff --git a/docs/user/clusters/index.md b/docs/user/clusters/index.md index 0c2e7d7..089fd4b 100644 --- a/docs/user/clusters/index.md +++ b/docs/user/clusters/index.md @@ -221,7 +221,8 @@ applied revision is not safely servable. Each refusal names its remedy: | Boot error | Meaning | Remedy | |---|---|---| | `cluster_state_missing` | no ledger | `cluster import`, then `apply` | -| `cluster_recovery_pending` | interrupted operation awaiting sweep | run `cluster apply` (or any state-mutating command), restart | +| `cluster_recovery_pending` | graph was quarantined because an interrupted operation awaits sweep | run `cluster apply` (or any state-mutating command), restart | +| `cluster_no_healthy_graphs` | every applied graph is quarantined or failed startup | sweep/fix the graph-specific failures, then restart | | `catalog_payload_missing` / `…_digest_mismatch` | catalog blob lost or tampered | `cluster refresh`, then `apply`, restart | | `policy_bindings_missing` | ledger predates binding metadata | re-run `cluster apply` (backfills), restart | | `cluster_empty` | applied revision has no graphs | apply a cluster with ≥1 graph | @@ -231,6 +232,13 @@ A held *state lock* is deliberately **not** a boot error — the server reads the atomically-replaced ledger without locking, so serving never contends with an in-flight apply. +When at least one graph is healthy, graph-attributed recovery sidecars and +graph-local startup failures do not block the whole server. The affected +graph is skipped, its graph-only policy bindings and queries are omitted, +and `/graphs` lists only the ready graphs. Pass +`omnigraph-server --require-all-graphs` or set +`OMNIGRAPH_REQUIRE_ALL_GRAPHS=1` to make any such quarantine fail startup. + ## 6. Deployment patterns - **Replicas**: any number of `--cluster` servers can serve the same config diff --git a/docs/user/deployment.md b/docs/user/deployment.md index a0d8e9f..1772b9a 100644 --- a/docs/user/deployment.md +++ b/docs/user/deployment.md @@ -208,6 +208,7 @@ When no positional args are given, the image entrypoint |---|---| | `OMNIGRAPH_CLUSTER` | Cluster boot source — a config directory or a storage-root URI, forwarded as `--cluster`. The only boot source. | | `OMNIGRAPH_BIND` | Listen address (default `0.0.0.0:8080`). | +| `OMNIGRAPH_REQUIRE_ALL_GRAPHS` | When truthy, forwarded as `--require-all-graphs`: any graph-local quarantine or startup failure aborts cluster boot instead of serving the healthy subset. | Per-graph and server-level Cedar policy come from the cluster's applied revision (authored in `cluster.yaml` and published with `cluster apply`), diff --git a/docs/user/operations/server.md b/docs/user/operations/server.md index ced9d0d..18032e9 100644 --- a/docs/user/operations/server.md +++ b/docs/user/operations/server.md @@ -15,11 +15,24 @@ omnigraph-server --cluster --bind 0.0.0.0:8080 startup configs (id, URI, optional per-graph policy, stored-query registry) plus an optional server-level policy, then opens every configured graph in parallel at startup (bounded concurrency = 4, -fail-fast on the first open error). Routing is always multi-graph — +quarantining graph-specific open failures). Routing is always multi-graph — requests to bare flat protected paths (`/read`, `/snapshot`, …) return 404; the served surface is `/graphs/{graph_id}/...`. See [cluster-config.md](../clusters/config.md#serving-from-the-cluster-the-mode-switch) -for what is read and the fail-fast readiness rules. +for what is read and the readiness rules. + +Readiness is fail-fast for cluster-global problems: missing or unreadable +state, invalid/unattributable recovery sidecars, unreadable shared catalog +payloads, cluster policy errors, or zero healthy graphs. Graph-attributed +pending recovery sidecars and graph-specific startup failures quarantine +that graph instead; the server logs startup diagnostics and serves the +remaining healthy graphs. `GET /graphs` enumerates ready/served graphs only, +so quarantined graphs are absent and their routes return 404. + +Operators who want the original all-or-nothing boot contract can pass +`--require-all-graphs` or set `OMNIGRAPH_REQUIRE_ALL_GRAPHS=1`. In that mode, +any graph quarantine, graph-open failure, stored-query startup failure, or +embedding-provider resolution failure aborts startup. A scheme-qualified argument (`s3://…`) reads the ledger straight from the storage root, with no local config directory. `--bind`, @@ -27,7 +40,7 @@ storage root, with no local config directory. `--bind`, ### Stored-query validation at startup -If a graph declares a `queries:` registry (see [cli-reference](../cli/reference.md)), the server **loads and type-checks every stored query against that graph's live schema at startup** and **refuses to boot** if any query references a type or property the schema lacks — the same fail-loud posture as a malformed policy file, so schema drift surfaces at the deploy boundary rather than at invocation. Two MCP-exposed queries claiming the same tool name is likewise a boot error. Non-blocking advisories (e.g. an MCP-exposed query with a vector parameter an agent cannot supply) are logged. Validate offline before deploying with `omnigraph queries validate`. Discover the exposed queries as a typed tool catalog with `GET /queries`, and invoke one over HTTP with `POST /queries/{name}` (both below). +If a graph declares a `queries:` registry (see [cli-reference](../cli/reference.md)), the server **loads and type-checks every stored query against that graph's live schema at startup**. Query parse/type failures quarantine that graph; if no graph remains healthy, startup refuses. Two MCP-exposed queries claiming the same tool name are likewise graph-local startup failures. Non-blocking advisories (e.g. an MCP-exposed query with a vector parameter an agent cannot supply) are logged. Validate offline before deploying with `omnigraph queries validate`. Discover the exposed queries as a typed tool catalog with `GET /queries`, and invoke one over HTTP with `POST /queries/{name}` (both below). ## Endpoint inventory @@ -61,7 +74,7 @@ Server-level management endpoints: | Method | Path | Auth | Action | |---|---|---|---| -| GET | `/graphs` | bearer + `graph_list` on `Server::"root"` | list registered graphs | +| GET | `/graphs` | bearer + `graph_list` on `Server::"root"` | list ready/served graphs | ### Stored-query catalog (`GET /queries`)