From 0da750c0965292918ec9d355d8bd3b0daff7d817 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Wed, 27 May 2026 12:03:28 +0200 Subject: [PATCH] mr-668: fail-fast multi-graph startup with try_collect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `open_multi_graph_state` doc comment claims "Fail-fast — the first open error aborts startup; other in-flight opens are dropped" but the code did .buffer_unordered(4) .collect::>() .await .into_iter() .collect::>>()?; which drains every future in the stream before propagating the first `Err`. With N S3-backed graphs and graph #2 failing fast, the caller still waits for #1, #3, #4, … to either succeed or fail before seeing the error. Replace the four-line dance with `futures::TryStreamExt::try_collect`, which short-circuits on the first `Err` and drops the rest. The doc comment now matches behavior. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph-server/src/lib.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 0b005e3..01312c9 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -1032,29 +1032,30 @@ async fn open_multi_graph_state( server_policy_file: Option<&PathBuf>, config_path: PathBuf, ) -> Result { - use futures::StreamExt; + use futures::{StreamExt, TryStreamExt}; if graphs.is_empty() { bail!("multi-graph mode requires at least one graph in the `graphs:` map"); } // Server-level policy (loaded once, applies to management endpoints). - // The placeholder graph_id `"server"` matches the PolicyEngine API - // shape until the Cedar resource-model refactor (PR 6a) lands. + // The placeholder graph_id `"server"` is the sentinel the Cedar + // resource-model refactor maps to the singleton + // `Omnigraph::Server::"root"` entity at evaluation time. let server_policy = match server_policy_file { Some(path) => Some(PolicyEngine::load(path, "server")?), None => None, }; + // `try_collect` propagates the first error eagerly, dropping every + // in-flight open. `buffer_unordered + collect::>` would drain + // the stream before checking errors — incorrect for the docstring's + // "fail-fast" claim and wasteful on S3-backed graphs. let handles: Vec> = futures::stream::iter(graphs.into_iter()) - .map(|cfg| async move { - open_single_graph(cfg).await - }) + .map(|cfg| async move { open_single_graph(cfg).await }) .buffer_unordered(4) - .collect::>() - .await - .into_iter() - .collect::>>()?; + .try_collect() + .await?; let workload = workload::WorkloadController::from_env(); let state = AppState::new_multi(