mr-668: fail-fast multi-graph startup with try_collect

The `open_multi_graph_state` doc comment claims "Fail-fast — the
first open error aborts startup; other in-flight opens are dropped"
but the code did

    .buffer_unordered(4)
    .collect::<Vec<_>>()
    .await
    .into_iter()
    .collect::<Result<Vec<_>>>()?;

which drains every future in the stream before propagating the first
`Err`. With N S3-backed graphs and graph #2 failing fast, the caller
still waits for #1, #3, #4, … to either succeed or fail before
seeing the error.

Replace the four-line dance with `futures::TryStreamExt::try_collect`,
which short-circuits on the first `Err` and drops the rest. The
doc comment now matches behavior.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-27 12:03:28 +02:00
parent ca474e23b1
commit 0da750c096
No known key found for this signature in database

View file

@ -1032,29 +1032,30 @@ async fn open_multi_graph_state(
server_policy_file: Option<&PathBuf>,
config_path: PathBuf,
) -> Result<AppState> {
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
if graphs.is_empty() {
bail!("multi-graph mode requires at least one graph in the `graphs:` map");
}
// Server-level policy (loaded once, applies to management endpoints).
// The placeholder graph_id `"server"` matches the PolicyEngine API
// shape until the Cedar resource-model refactor (PR 6a) lands.
// The placeholder graph_id `"server"` is the sentinel the Cedar
// resource-model refactor maps to the singleton
// `Omnigraph::Server::"root"` entity at evaluation time.
let server_policy = match server_policy_file {
Some(path) => Some(PolicyEngine::load(path, "server")?),
None => None,
};
// `try_collect` propagates the first error eagerly, dropping every
// in-flight open. `buffer_unordered + collect::<Vec<_>>` would drain
// the stream before checking errors — incorrect for the docstring's
// "fail-fast" claim and wasteful on S3-backed graphs.
let handles: Vec<Arc<GraphHandle>> = futures::stream::iter(graphs.into_iter())
.map(|cfg| async move {
open_single_graph(cfg).await
})
.map(|cfg| async move { open_single_graph(cfg).await })
.buffer_unordered(4)
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
.try_collect()
.await?;
let workload = workload::WorkloadController::from_env();
let state = AppState::new_multi(