From aabb3dca2eca9dcf19e21886992fd17855726156 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Fri, 12 Jun 2026 13:44:51 +0300 Subject: [PATCH] fix(storage): flush before acking in local write_text_if_absent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit tokio's async File buffers writes internally: write_all only fills the buffer, and the actual OS write happens in a background task after drop — so write_text_if_absent could return Ok(true) with the file created but still EMPTY, and an immediate reader saw EOF. Caught twice in CI as 'EOF while parsing a value' reading state.json right after cluster import (the cluster's first state-write routes here since the storage port); also an invariant-6 violation (acknowledged before the write reached the OS). The other local write paths use tokio::fs::write, which flushes internally — this was the one miss. Fix: flush().await before Ok, with the same remove-on-failure cleanup as the write itself. Regression test is a best-effort tight loop (the window is timing-dependent; the two CI failures are the recorded red) asserting read-after-ack never sees a short file. Co-Authored-By: Claude Fable 5 --- crates/omnigraph/src/storage.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/crates/omnigraph/src/storage.rs b/crates/omnigraph/src/storage.rs index 978d1ce..187a6d6 100644 --- a/crates/omnigraph/src/storage.rs +++ b/crates/omnigraph/src/storage.rs @@ -139,6 +139,18 @@ impl StorageAdapter for LocalStorageAdapter { let _ = tokio::fs::remove_file(&path).await; return Err(err.into()); } + // tokio's async File buffers internally: without an explicit flush, + // write_all only fills the buffer and the actual OS write happens in + // a background task AFTER this fn returns — a reader can then see + // the created-but-still-empty file (caught twice in CI as an + // "EOF while parsing" on a state.json read right after import). + // Flushing before Ok restores write-then-read consistency, matching + // tokio::fs::write (which flushes internally) used by every other + // write path here. + if let Err(err) = file.flush().await { + let _ = tokio::fs::remove_file(&path).await; + return Err(err.into()); + } Ok(true) } @@ -599,6 +611,25 @@ fn env_var_truthy(key: &str) -> bool { #[cfg(test)] mod tests { + /// Regression for the write_text_if_absent buffering bug: a reader + /// immediately after Ok(true) must never see the created file empty. + /// The failure is timing-dependent (tokio's background write task), so + /// this loop is a best-effort local reproducer — the recorded red is + /// two CI failures ("EOF while parsing" on a state.json read right + /// after cluster import). + #[tokio::test(flavor = "multi_thread")] + async fn write_text_if_absent_is_read_consistent_immediately() { + let dir = tempfile::tempdir().unwrap(); + let adapter = super::storage_for_uri(&format!("file://{}", dir.path().display())).unwrap(); + let payload = "x".repeat(64 * 1024); + for i in 0..200 { + let uri = format!("file://{}/f{}.json", dir.path().display(), i); + assert!(adapter.write_text_if_absent(&uri, &payload).await.unwrap()); + let read = std::fs::read_to_string(dir.path().join(format!("f{i}.json"))).unwrap(); + assert_eq!(read.len(), payload.len(), "iteration {i}: short read"); + } + } + #[tokio::test] async fn local_versioned_cas_roundtrip() { let dir = tempfile::tempdir().unwrap();