Merge pull request #204 from ModernRelay/fix/local-write-if-absent-flush

fix(storage): flush before acking in local write_text_if_absent
This commit is contained in:
Andrew Altshuler 2026-06-12 14:12:14 +03:00 committed by GitHub
commit 13ceab3336
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -139,6 +139,18 @@ impl StorageAdapter for LocalStorageAdapter {
let _ = tokio::fs::remove_file(&path).await;
return Err(err.into());
}
// tokio's async File buffers internally: without an explicit flush,
// write_all only fills the buffer and the actual OS write happens in
// a background task AFTER this fn returns — a reader can then see
// the created-but-still-empty file (caught twice in CI as an
// "EOF while parsing" on a state.json read right after import).
// Flushing before Ok restores write-then-read consistency, matching
// tokio::fs::write (which flushes internally) used by every other
// write path here.
if let Err(err) = file.flush().await {
let _ = tokio::fs::remove_file(&path).await;
return Err(err.into());
}
Ok(true)
}
@ -599,6 +611,25 @@ fn env_var_truthy(key: &str) -> bool {
#[cfg(test)]
mod tests {
/// Regression for the write_text_if_absent buffering bug: a reader
/// immediately after Ok(true) must never see the created file empty.
/// The failure is timing-dependent (tokio's background write task), so
/// this loop is a best-effort local reproducer — the recorded red is
/// two CI failures ("EOF while parsing" on a state.json read right
/// after cluster import).
#[tokio::test(flavor = "multi_thread")]
async fn write_text_if_absent_is_read_consistent_immediately() {
let dir = tempfile::tempdir().unwrap();
let adapter = super::storage_for_uri(&format!("file://{}", dir.path().display())).unwrap();
let payload = "x".repeat(64 * 1024);
for i in 0..200 {
let uri = format!("file://{}/f{}.json", dir.path().display(), i);
assert!(adapter.write_text_if_absent(&uri, &payload).await.unwrap());
let read = std::fs::read_to_string(dir.path().join(format!("f{i}.json"))).unwrap();
assert_eq!(read.len(), payload.len(), "iteration {i}: short read");
}
}
#[tokio::test]
async fn local_versioned_cas_roundtrip() {
let dir = tempfile::tempdir().unwrap();