diff --git a/crates/omnigraph/src/storage.rs b/crates/omnigraph/src/storage.rs index 978d1ce..187a6d6 100644 --- a/crates/omnigraph/src/storage.rs +++ b/crates/omnigraph/src/storage.rs @@ -139,6 +139,18 @@ impl StorageAdapter for LocalStorageAdapter { let _ = tokio::fs::remove_file(&path).await; return Err(err.into()); } + // tokio's async File buffers internally: without an explicit flush, + // write_all only fills the buffer and the actual OS write happens in + // a background task AFTER this fn returns — a reader can then see + // the created-but-still-empty file (caught twice in CI as an + // "EOF while parsing" on a state.json read right after import). + // Flushing before Ok restores write-then-read consistency, matching + // tokio::fs::write (which flushes internally) used by every other + // write path here. + if let Err(err) = file.flush().await { + let _ = tokio::fs::remove_file(&path).await; + return Err(err.into()); + } Ok(true) } @@ -599,6 +611,25 @@ fn env_var_truthy(key: &str) -> bool { #[cfg(test)] mod tests { + /// Regression for the write_text_if_absent buffering bug: a reader + /// immediately after Ok(true) must never see the created file empty. + /// The failure is timing-dependent (tokio's background write task), so + /// this loop is a best-effort local reproducer — the recorded red is + /// two CI failures ("EOF while parsing" on a state.json read right + /// after cluster import). + #[tokio::test(flavor = "multi_thread")] + async fn write_text_if_absent_is_read_consistent_immediately() { + let dir = tempfile::tempdir().unwrap(); + let adapter = super::storage_for_uri(&format!("file://{}", dir.path().display())).unwrap(); + let payload = "x".repeat(64 * 1024); + for i in 0..200 { + let uri = format!("file://{}/f{}.json", dir.path().display(), i); + assert!(adapter.write_text_if_absent(&uri, &payload).await.unwrap()); + let read = std::fs::read_to_string(dir.path().join(format!("f{i}.json"))).unwrap(); + assert_eq!(read.len(), payload.len(), "iteration {i}: short read"); + } + } + #[tokio::test] async fn local_versioned_cas_roundtrip() { let dir = tempfile::tempdir().unwrap();