fix(cluster): stop cluster-apply crash-loops from the recovery-sidecar trap (#284)

* fix(cluster): stop cluster-apply crash-loops from the recovery-sidecar trap

A `cluster apply` carrying a schema change against a graph that has
non-main branches, or an unsupported "needs backfill" migration, armed a
recovery sidecar *before* calling the engine, then left it behind when the
engine rejected the apply pre-movement. The server refuses to boot while
any sidecar is pending, and re-running apply re-armed a fresh sidecar — an
unescapable crash loop. None of the engine rejections are bugs; the trap
is in the apply/serve choreography.

Three coordinated changes:

1. Preview before arming the sidecar. `cluster apply` now runs
   `preview_schema_apply_with_options` before `write_recovery_sidecar`, so
   parser/planner rejections (non-main branches, unsupported plan) fail
   loudly without leaving recovery work behind. The post-preview engine
   error path now deletes the sidecar when the live schema still matches
   the recorded digest (nothing moved), and keeps it only on real
   mid-movement failure — both branches covered by new engine-failpoint
   tests (cluster failpoints now enable omnigraph/failpoints).

2. Per-graph quarantine at serve time instead of whole-cluster refusal.
   A graph-attributed pending sidecar, an unopenable graph root, a query
   parse failure, or an unresolvable embedding provider now quarantines
   just that graph (logged loudly at every boot layer) while healthy
   graphs serve; `/graphs` lists only ready graphs and quarantined routes
   404. Cluster-global problems (missing/unreadable state, malformed or
   unattributable sidecars, shared-catalog or cluster-policy errors, zero
   healthy graphs) stay fail-fast. `--require-all-graphs` /
   OMNIGRAPH_REQUIRE_ALL_GRAPHS=1 restores all-or-nothing boot.

3. Backfill embedding-provider profile metadata on apply. Mirrors the
   existing policy-binding backfill: a pre-5A ledger missing
   `embedding_profile` is now detected as a metadata-only change and
   backfilled by a no-op apply, instead of bricking serve with
   `embedding_provider_profile_missing` forever.

Tests: trap (no sidecar after a rejected apply), both digest-cleanup
branches, per-graph quarantine (cluster + server), embedding backfill.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* docs: resilient cluster boot + recovery-sidecar trap fix

Amend RFC-005 D4 readiness posture (cluster-global fail-fast vs graph-local
quarantine; deviation #5 for --require-all-graphs), add the v0.7.0 release
note, and update the user cluster/server/deployment docs and the
OMNIGRAPH_REQUIRE_ALL_GRAPHS env var.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(cluster): surface sidecar-cleanup failures; document severity promotion

Address Greptile review on PR #284:

- The pre-movement sidecar cleanup fast-path discarded `delete_object`'s
  result, so a transient delete failure left the graph quarantined with no
  signal. Add `try_delete_object` (Result-returning) and emit a
  `recovery_sidecar_cleanup_failed` warning diagnostic on failure; the
  fire-and-forget `delete_object` now delegates to it.
- Document why the serve-time loop promotes every `list_recovery_sidecars`
  diagnostic to a cluster-fatal error (the listing only emits genuine
  read/parse/version failures, as warnings, whose blast radius serving
  cannot prove) and note the promote-by-code path if that ever changes.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Andrew Altshuler 2026-06-19 03:34:15 +03:00 committed by GitHub
parent 7168ee0ed0
commit 7fd23c54a3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 1043 additions and 203 deletions

View file

@ -1,9 +1,9 @@
pub mod api;
mod handlers;
mod settings;
pub use settings::{load_server_settings, classify_server_runtime_state, ServerRuntimeState};
use settings::*;
use handlers::*;
use settings::*;
pub use settings::{ServerRuntimeState, classify_server_runtime_state, load_server_settings};
pub mod auth;
pub mod graph_id;
pub mod identity;
@ -29,10 +29,10 @@ use api::{
BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput,
CommitListQuery, ErrorCode, ErrorOutput, ExportRequest, GraphInfo, GraphListResponse,
HealthOutput, IngestOutput, IngestRequest, InvokeStoredQueryRequest,
InvokeStoredQueryResponse, QueriesCatalogOutput, QueryRequest, ReadOutput, ReadRequest,
SchemaApplyOutput, SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output,
schema_apply_output, snapshot_payload,
HealthOutput, IngestOutput, IngestRequest, InvokeStoredQueryRequest, InvokeStoredQueryResponse,
QueriesCatalogOutput, QueryRequest, ReadOutput, ReadRequest, SchemaApplyOutput,
SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output, schema_apply_output,
snapshot_payload,
};
pub use auth::{AWS_SECRET_ENV, EnvOrFileTokenSource, TokenSource, resolve_token_source};
use axum::body::{Body, Bytes};
@ -166,6 +166,10 @@ pub struct ServerConfig {
/// who set up auth and forgot the policy file would otherwise ship
/// the illusion of protection.
pub allow_unauthenticated: bool,
/// Operator opt-in for fail-fast cluster boot. By default, graph-local
/// startup failures quarantine that graph and healthy graphs still serve.
/// When true, any quarantined or failed graph aborts startup.
pub require_all_graphs: bool,
}
/// What `load_server_settings` produces. RFC-011 cluster-only: the
@ -303,7 +307,14 @@ impl AppState {
) -> Self {
let bearer_tokens = hash_bearer_tokens(bearer_tokens);
let per_graph_policy = policy_engine.map(Arc::new);
Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, Arc::new(workload), None)
Self::build_single_mode(
uri,
db,
bearer_tokens,
per_graph_policy,
Arc::new(workload),
None,
)
}
/// Like `new_single`, but attaches a pre-validated stored-query
@ -420,13 +431,8 @@ impl AppState {
bearer_tokens: Vec<(String, String)>,
policy_file: Option<&PathBuf>,
) -> Result<Self> {
Self::open_single_with_queries(
uri,
bearer_tokens,
policy_file,
QueryRegistry::default(),
)
.await
Self::open_single_with_queries(uri, bearer_tokens, policy_file, QueryRegistry::default())
.await
}
/// Single-mode boot with a stored-query registry: open the engine,
@ -509,8 +515,7 @@ impl AppState {
// reserved id `default` — both the registry key and the URL
// segment (`/graphs/default/...`).
let uri = normalize_root_uri(&uri).unwrap_or(uri);
let graph_id =
GraphId::try_from("default").expect("'default' is a valid GraphId");
let graph_id = GraphId::try_from("default").expect("'default' is a valid GraphId");
let key = GraphKey::cluster(graph_id);
let handle = Arc::new(GraphHandle {
key,
@ -889,15 +894,21 @@ pub fn build_app(state: AppState) -> Router {
// flagged and their responses include RFC 9745 Deprecation +
// RFC 8288 Link headers. Suppress the call-site warning for the
// route registration itself.
.route("/read", post({
#[allow(deprecated)]
server_read
}))
.route(
"/read",
post({
#[allow(deprecated)]
server_read
}),
)
.route("/query", post(server_query))
.route("/change", post({
#[allow(deprecated)]
server_change
}))
.route(
"/change",
post({
#[allow(deprecated)]
server_change
}),
)
.route("/mutate", post(server_mutate))
.route("/queries", get(server_list_queries))
.route("/queries/{name}", post(server_invoke_query))
@ -1013,7 +1024,14 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
config = %config_path.display(),
"serving omnigraph"
);
open_multi_graph_state(graphs, tokens, server_policy.as_ref(), config_path).await?
open_multi_graph_state(
graphs,
tokens,
server_policy.as_ref(),
config_path,
config.require_all_graphs,
)
.await?
}
};
@ -1033,9 +1051,9 @@ fn load_graph_policy(source: &PolicySource, graph_id: &str) -> Result<PolicyEngi
}
/// Parallel open of every graph in the startup config, with bounded
/// concurrency (`buffer_unordered(4)`). Fail-fast — the first open error
/// aborts startup; other in-flight opens are dropped (their `Omnigraph`
/// instances close cleanly via Arc drop).
/// concurrency (`buffer_unordered(4)`). Graph-specific open failures
/// quarantine that graph; startup succeeds as long as at least one graph
/// opens.
///
/// The bound 4 is a rule-of-thumb for I/O-bound work. At N ≤ 10 this
/// trades startup latency for a small amount of concurrent S3 / Lance
@ -1045,8 +1063,9 @@ pub async fn open_multi_graph_state(
tokens: Vec<(String, String)>,
server_policy_source: Option<&PolicySource>,
config_path: PathBuf,
require_all_graphs: bool,
) -> Result<AppState> {
use futures::{StreamExt, TryStreamExt};
use futures::StreamExt;
if graphs.is_empty() {
bail!("multi-graph mode requires at least one graph in the `graphs:` map");
@ -1058,21 +1077,48 @@ pub async fn open_multi_graph_state(
// `Omnigraph::Server::"root"` entity at evaluation time.
let server_policy = match server_policy_source {
Some(PolicySource::File(path)) => Some(PolicyEngine::load_server(path)?),
Some(PolicySource::Inline(source)) => {
Some(PolicyEngine::load_server_from_source(source)?)
}
Some(PolicySource::Inline(source)) => Some(PolicyEngine::load_server_from_source(source)?),
None => None,
};
// `try_collect` propagates the first error eagerly, dropping every
// in-flight open. `buffer_unordered + collect::<Vec<_>>` would drain
// the stream before checking errors — incorrect for the docstring's
// "fail-fast" claim and wasteful on S3-backed graphs.
let handles: Vec<Arc<GraphHandle>> = futures::stream::iter(graphs.into_iter())
.map(|cfg| async move { open_single_graph(cfg).await })
let configured_graphs = graphs.len();
let results = futures::stream::iter(graphs.into_iter())
.map(|cfg| async move {
let graph_id = cfg.graph_id.clone();
open_single_graph(cfg).await.map_err(|err| (graph_id, err))
})
.buffer_unordered(4)
.try_collect()
.await?;
.collect::<Vec<_>>()
.await;
let mut handles = Vec::new();
let mut failed = 0usize;
for result in results {
match result {
Ok(handle) => handles.push(handle),
Err((graph_id, err)) => {
failed += 1;
warn!(
graph_id = %graph_id,
error = %err,
"graph quarantined during startup"
);
}
}
}
if require_all_graphs && failed > 0 {
bail!(
"strict multi-graph startup requires every graph to open ({} configured, {} failed)",
configured_graphs,
failed
);
}
if handles.is_empty() {
bail!(
"no healthy graphs opened from multi-graph startup config ({} configured, {} failed)",
configured_graphs,
failed
);
}
let workload = workload::WorkloadController::from_env();
let state = AppState::new_multi(handles, tokens, server_policy, workload, Some(config_path))

View file

@ -22,6 +22,11 @@ struct Cli {
/// Equivalent to setting `OMNIGRAPH_UNAUTHENTICATED=1`.
#[arg(long)]
unauthenticated: bool,
/// Fail startup if any applied graph is quarantined or fails to open.
/// By default, graph-local failures are logged and healthy graphs still
/// serve. Equivalent to setting `OMNIGRAPH_REQUIRE_ALL_GRAPHS=1`.
#[arg(long)]
require_all_graphs: bool,
}
#[tokio::main]
@ -30,7 +35,12 @@ async fn main() -> Result<()> {
init_tracing();
let cli = Cli::parse();
let settings: ServerConfig =
load_server_settings(cli.cluster.as_ref(), cli.bind, cli.unauthenticated).await?;
let settings: ServerConfig = load_server_settings(
cli.cluster.as_ref(),
cli.bind,
cli.unauthenticated,
cli.require_all_graphs,
)
.await?;
serve(settings).await
}

View file

@ -12,6 +12,7 @@ pub(crate) async fn load_cluster_settings(
cluster_dir: &PathBuf,
cli_bind: Option<String>,
cli_allow_unauthenticated: bool,
cli_require_all_graphs: bool,
) -> Result<ServerConfig> {
// `--cluster` accepts either a config directory (the ledger location is
// resolved through cluster.yaml's `storage:` key) or a storage-root URI
@ -28,11 +29,45 @@ pub(crate) async fn load_cluster_settings(
.map_err(|diagnostics| {
let details = diagnostics
.iter()
.map(|diagnostic| format!("[{}] {}: {}", diagnostic.code, diagnostic.path, diagnostic.message))
.map(|diagnostic| {
format!(
"[{}] {}: {}",
diagnostic.code, diagnostic.path, diagnostic.message
)
})
.collect::<Vec<_>>()
.join("\n ");
eyre!("the cluster at '{}' is not ready to serve:\n {details}", cluster_dir.display())
eyre!(
"the cluster at '{}' is not ready to serve:\n {details}",
cluster_dir.display()
)
})?;
for diagnostic in &snapshot.diagnostics {
warn!(
code = %diagnostic.code,
path = %diagnostic.path,
message = %diagnostic.message,
"cluster startup diagnostic"
);
}
let env_require_all_graphs = env_flag("OMNIGRAPH_REQUIRE_ALL_GRAPHS");
let require_all_graphs = cli_require_all_graphs || env_require_all_graphs;
if require_all_graphs && !snapshot.diagnostics.is_empty() {
let details = snapshot
.diagnostics
.iter()
.map(|diagnostic| {
format!(
"[{}] {}: {}",
diagnostic.code, diagnostic.path, diagnostic.message
)
})
.collect::<Vec<_>>()
.join("\n ");
bail!(
"strict cluster boot requires every applied graph to be ready; startup diagnostics:\n {details}"
);
}
// Bindings -> Cedar slots. The serving pipeline loads one bundle per
// graph plus one server-level bundle; stacked bundles per scope are a
@ -69,6 +104,7 @@ pub(crate) async fn load_cluster_settings(
}
let mut graphs = Vec::new();
let mut skipped_graphs = Vec::new();
for graph in &snapshot.graphs {
let specs: Vec<queries::RegistrySpec> = snapshot
.queries
@ -84,40 +120,75 @@ pub(crate) async fn load_cluster_settings(
tool_name: None,
})
.collect();
let registry = QueryRegistry::from_specs(specs).map_err(|errors| {
let details = errors
.iter()
.map(|error| error.to_string())
.collect::<Vec<_>>()
.join("\n ");
eyre!(
"stored queries in the applied revision failed to parse:\n {details}\nrun `cluster refresh` then `cluster apply`, and restart"
)
})?;
let registry = match QueryRegistry::from_specs(specs) {
Ok(registry) => registry,
Err(errors) => {
let details = errors
.iter()
.map(|error| error.to_string())
.collect::<Vec<_>>()
.join("\n ");
warn!(
graph_id = %graph.graph_id,
errors = %details,
"graph quarantined because stored queries failed to parse"
);
skipped_graphs.push(format!(
"{}: stored queries failed to parse: {details}",
graph.graph_id
));
continue;
}
};
let embedding = match graph
.embedding
.as_ref()
.map(|profile| {
profile.resolve().map_err(|err| {
eyre!("embedding provider for graph '{}': {err}", graph.graph_id)
})
})
.transpose()
{
Ok(embedding) => embedding,
Err(err) => {
warn!(
graph_id = %graph.graph_id,
error = %err,
"graph quarantined because embedding provider configuration failed"
);
skipped_graphs.push(format!("{}: {err}", graph.graph_id));
continue;
}
};
graphs.push(GraphStartupConfig {
graph_id: graph.graph_id.clone(),
uri: graph.root.to_string_lossy().to_string(),
policy: graph_policies.get(&graph.graph_id).cloned(),
embedding: graph
.embedding
.as_ref()
.map(|profile| {
profile.resolve().map_err(|err| {
eyre!("embedding provider for graph '{}': {err}", graph.graph_id)
})
})
.transpose()?,
embedding,
queries: registry,
});
}
if graphs.is_empty() {
let skipped = skipped_graphs.join(", ");
bail!(
"the cluster at '{}' has no healthy graphs to serve{}",
cluster_dir.display(),
if skipped.is_empty() {
String::new()
} else {
format!(" (quarantined: {skipped})")
}
);
}
if require_all_graphs && !skipped_graphs.is_empty() {
bail!(
"strict cluster boot requires every graph to build startup settings (quarantined: {})",
skipped_graphs.join(", ")
);
}
let env_unauth = std::env::var("OMNIGRAPH_UNAUTHENTICATED")
.ok()
.map(|v| {
let trimmed = v.trim();
!trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false")
})
.unwrap_or(false);
let env_unauth = env_flag("OMNIGRAPH_UNAUTHENTICATED");
Ok(ServerConfig {
mode: ServerConfigMode::Multi {
@ -127,6 +198,7 @@ pub(crate) async fn load_cluster_settings(
},
bind: cli_bind.unwrap_or_else(|| "127.0.0.1:8080".to_string()),
allow_unauthenticated: cli_allow_unauthenticated || env_unauth,
require_all_graphs,
})
}
@ -138,6 +210,7 @@ pub async fn load_server_settings(
cli_cluster: Option<&PathBuf>,
cli_bind: Option<String>,
cli_allow_unauthenticated: bool,
cli_require_all_graphs: bool,
) -> Result<ServerConfig> {
let Some(cluster_dir) = cli_cluster else {
bail!(
@ -147,7 +220,23 @@ pub async fn load_server_settings(
was removed in RFC-011."
);
};
load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated).await
load_cluster_settings(
cluster_dir,
cli_bind,
cli_allow_unauthenticated,
cli_require_all_graphs,
)
.await
}
fn env_flag(name: &str) -> bool {
std::env::var(name)
.ok()
.map(|v| {
let trimmed = v.trim();
!trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false")
})
.unwrap_or(false)
}
/// MR-723 server runtime state, classified from the three-state matrix
@ -240,7 +329,9 @@ pub(crate) fn read_bearer_tokens_file(path: &str) -> Result<Vec<(String, String)
.wrap_err_with(|| format!("failed to parse bearer tokens file at {path}"))
}
pub(crate) fn validate_bearer_tokens(entries: Vec<(String, String)>) -> Result<Vec<(String, String)>> {
pub(crate) fn validate_bearer_tokens(
entries: Vec<(String, String)>,
) -> Result<Vec<(String, String)>> {
let mut seen_actors = HashSet::new();
let mut seen_tokens = HashSet::new();
let mut normalized = Vec::with_capacity(entries.len());
@ -301,11 +392,18 @@ mod tests {
/// as 404 without also masking a 401/500. Pins each outcome.
#[test]
fn authorize_splits_decision_from_operational_error() {
use super::{Authz, PolicyAction, PolicyCompiler, PolicyConfig, PolicyRequest, ResolvedActor, authorize};
use super::{
Authz, PolicyAction, PolicyCompiler, PolicyConfig, PolicyRequest, ResolvedActor,
authorize,
};
use std::sync::Arc;
fn req(action: PolicyAction) -> PolicyRequest {
PolicyRequest { action, branch: None, target_branch: None }
PolicyRequest {
action,
branch: None,
target_branch: None,
}
}
let actor = ResolvedActor::cluster_static(Arc::from("act-alice"));
@ -345,7 +443,11 @@ mod tests {
authorize(
Some(&actor),
Some(&engine),
PolicyRequest { action: PolicyAction::Read, branch: Some("main".to_string()), target_branch: None },
PolicyRequest {
action: PolicyAction::Read,
branch: Some("main".to_string()),
target_branch: None
},
)
.unwrap(),
Authz::Allowed
@ -354,11 +456,17 @@ mod tests {
match authorize(
Some(&actor),
Some(&engine),
PolicyRequest { action: PolicyAction::Change, branch: Some("main".to_string()), target_branch: None },
PolicyRequest {
action: PolicyAction::Change,
branch: Some("main".to_string()),
target_branch: None,
},
)
.unwrap()
{
Authz::Denied(message) => assert!(!message.is_empty(), "a deny carries its decision message"),
Authz::Denied(message) => {
assert!(!message.is_empty(), "a deny carries its decision message")
}
Authz::Allowed => panic!("change must be denied: only read is allowed"),
}
// Policy installed but no actor → operational failure (`Err`), NOT a
@ -397,8 +505,7 @@ mod tests {
};
// Empty registry → nothing attached, no error.
let empty =
super::validate_and_attach(QueryRegistry::default(), &catalog, "g").unwrap();
let empty = super::validate_and_attach(QueryRegistry::default(), &catalog, "g").unwrap();
assert!(empty.is_none());
// A query that type-checks → attached.
@ -407,7 +514,11 @@ mod tests {
"query find_user() { match { $u: User } return { $u.name } }",
)])
.unwrap();
assert!(super::validate_and_attach(ok, &catalog, "g").unwrap().is_some());
assert!(
super::validate_and_attach(ok, &catalog, "g")
.unwrap()
.is_some()
);
// A query referencing a type the schema lacks → boot refusal that
// names both the graph label and the offending query.
@ -420,7 +531,10 @@ mod tests {
let msg = err.to_string();
assert!(msg.contains("graph-x"), "labels the graph: {msg}");
assert!(msg.contains("ghost"), "names the query: {msg}");
assert!(msg.contains("schema check"), "mentions the schema check: {msg}");
assert!(
msg.contains("schema check"),
"mentions the schema check: {msg}"
);
}
#[test]
@ -451,7 +565,7 @@ mod tests {
async fn server_settings_require_cluster_boot_source() {
// RFC-011 cluster-only: with no --cluster the server refuses to
// start and names the cluster-required remedy.
let error = super::load_server_settings(None, None, false)
let error = super::load_server_settings(None, None, false, false)
.await
.unwrap_err();
assert!(
@ -534,6 +648,7 @@ mod tests {
},
bind: "127.0.0.1:0".to_string(),
allow_unauthenticated: false,
require_all_graphs: false,
};
let result = serve(config).await;
let err = result
@ -586,6 +701,7 @@ mod tests {
},
bind: "127.0.0.1:0".to_string(),
allow_unauthenticated: false,
require_all_graphs: false,
};
let result = serve(config).await;
let err =