fix(storage): flush before acking in local write_text_if_absent

tokio's async File buffers writes internally: write_all only fills the
buffer, and the actual OS write happens in a background task after drop —
so write_text_if_absent could return Ok(true) with the file created but
still EMPTY, and an immediate reader saw EOF. Caught twice in CI as
'EOF while parsing a value' reading state.json right after cluster import
(the cluster's first state-write routes here since the storage port);
also an invariant-6 violation (acknowledged before the write reached the
OS). The other local write paths use tokio::fs::write, which flushes
internally — this was the one miss.

Fix: flush().await before Ok, with the same remove-on-failure cleanup as
the write itself. Regression test is a best-effort tight loop (the window
is timing-dependent; the two CI failures are the recorded red) asserting
read-after-ack never sees a short file.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-12 13:44:51 +03:00
parent 867138499e
commit aabb3dca2e

View file

@ -139,6 +139,18 @@ impl StorageAdapter for LocalStorageAdapter {
let _ = tokio::fs::remove_file(&path).await;
return Err(err.into());
}
// tokio's async File buffers internally: without an explicit flush,
// write_all only fills the buffer and the actual OS write happens in
// a background task AFTER this fn returns — a reader can then see
// the created-but-still-empty file (caught twice in CI as an
// "EOF while parsing" on a state.json read right after import).
// Flushing before Ok restores write-then-read consistency, matching
// tokio::fs::write (which flushes internally) used by every other
// write path here.
if let Err(err) = file.flush().await {
let _ = tokio::fs::remove_file(&path).await;
return Err(err.into());
}
Ok(true)
}
@ -599,6 +611,25 @@ fn env_var_truthy(key: &str) -> bool {
#[cfg(test)]
mod tests {
/// Regression for the write_text_if_absent buffering bug: a reader
/// immediately after Ok(true) must never see the created file empty.
/// The failure is timing-dependent (tokio's background write task), so
/// this loop is a best-effort local reproducer — the recorded red is
/// two CI failures ("EOF while parsing" on a state.json read right
/// after cluster import).
#[tokio::test(flavor = "multi_thread")]
async fn write_text_if_absent_is_read_consistent_immediately() {
let dir = tempfile::tempdir().unwrap();
let adapter = super::storage_for_uri(&format!("file://{}", dir.path().display())).unwrap();
let payload = "x".repeat(64 * 1024);
for i in 0..200 {
let uri = format!("file://{}/f{}.json", dir.path().display(), i);
assert!(adapter.write_text_if_absent(&uri, &payload).await.unwrap());
let read = std::fs::read_to_string(dir.path().join(format!("f{i}.json"))).unwrap();
assert_eq!(read.len(), payload.len(), "iteration {i}: short read");
}
}
#[tokio::test]
async fn local_versioned_cas_roundtrip() {
let dir = tempfile::tempdir().unwrap();