diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 0b005e3..01312c9 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -1032,29 +1032,30 @@ async fn open_multi_graph_state( server_policy_file: Option<&PathBuf>, config_path: PathBuf, ) -> Result { - use futures::StreamExt; + use futures::{StreamExt, TryStreamExt}; if graphs.is_empty() { bail!("multi-graph mode requires at least one graph in the `graphs:` map"); } // Server-level policy (loaded once, applies to management endpoints). - // The placeholder graph_id `"server"` matches the PolicyEngine API - // shape until the Cedar resource-model refactor (PR 6a) lands. + // The placeholder graph_id `"server"` is the sentinel the Cedar + // resource-model refactor maps to the singleton + // `Omnigraph::Server::"root"` entity at evaluation time. let server_policy = match server_policy_file { Some(path) => Some(PolicyEngine::load(path, "server")?), None => None, }; + // `try_collect` propagates the first error eagerly, dropping every + // in-flight open. `buffer_unordered + collect::>` would drain + // the stream before checking errors — incorrect for the docstring's + // "fail-fast" claim and wasteful on S3-backed graphs. let handles: Vec> = futures::stream::iter(graphs.into_iter()) - .map(|cfg| async move { - open_single_graph(cfg).await - }) + .map(|cfg| async move { open_single_graph(cfg).await }) .buffer_unordered(4) - .collect::>() - .await - .into_iter() - .collect::>>()?; + .try_collect() + .await?; let workload = workload::WorkloadController::from_env(); let state = AppState::new_multi(