This commit is contained in:
elipeter 2026-06-03 22:10:01 -05:00
parent 425a9ed2a6
commit 2e12c19c48
2 changed files with 175 additions and 21 deletions

View file

@ -40,6 +40,7 @@
use super::{BuildPool, PoolCompileResult};
use serde::Deserialize;
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
@ -230,36 +231,141 @@ fn bootstrap_dir_for(toolchain_id: &str) -> Result<PathBuf, String> {
/// `dir` if they are not already present. Always re-writes the source
/// when the on-disk copy differs from the embedded one so a binary
/// upgrade picks up worker fixes without manual cache eviction.
///
/// The bootstrap dir is shared across every concurrent `nyx` process on
/// the host, so the compile-and-publish step is hardened against the
/// cross-process race that otherwise hands a half-written
/// `NyxJavacWorker.class` to a peer process spawning its worker (which
/// then fails to start, manifesting downstream as a flaky build):
///
/// - The publish is **atomic**: `javac` writes into a private,
/// pid-scoped staging dir and the finished class is `rename`d into
/// place. A concurrent reader sees either the previous complete
/// class or the new one, never a partial file. The old class is
/// never `remove`d first.
/// - Compiles are **serialised** on a `flock(2)` over `.bootstrap.lock`
/// so two processes never run `javac` into the same staging at once
/// and a waiter re-checks the now-published class instead of
/// recompiling.
fn ensure_worker_compiled(dir: &Path) -> Result<(), String> {
let src_path = dir.join(WORKER_FILENAME);
let class_path = dir.join(format!("{WORKER_CLASS}.class"));
let on_disk = std::fs::read_to_string(&src_path).ok();
let needs_write = on_disk.as_deref() != Some(WORKER_SOURCE);
if needs_write {
std::fs::write(&src_path, WORKER_SOURCE)
.map_err(|e| format!("javac-pool: write worker source: {e}"))?;
// Force a recompile if the source bytes changed under us.
let _ = std::fs::remove_file(&class_path);
}
if class_path.exists() {
// Fast path: a complete class already matches the current worker
// source. Checked before taking the cross-process lock so steady
// state stays lock-free.
if worker_class_ready(&src_path, &class_path) {
return Ok(());
}
// Serialise the compile-and-publish across processes sharing `dir`.
let _lock = BootstrapLock::acquire(dir)?;
// Re-check under the lock: another process may have published a good
// class while we were waiting on the lock.
if worker_class_ready(&src_path, &class_path) {
return Ok(());
}
// Publish the source (idempotent) so cache inspectors can see what
// the class was built from.
std::fs::write(&src_path, WORKER_SOURCE)
.map_err(|e| format!("javac-pool: write worker source: {e}"))?;
// Compile into a private staging dir, then atomically rename the
// class into place.
let staging = dir.join(format!(".compile-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&staging);
std::fs::create_dir_all(&staging).map_err(|e| format!("javac-pool: mkdir staging: {e}"))?;
let javac = std::env::var("NYX_JAVAC_BIN").unwrap_or_else(|_| "javac".to_owned());
let output = Command::new(&javac)
let compiled = Command::new(&javac)
.arg("-d")
.arg(dir)
.arg(&staging)
.arg(&src_path)
.env_clear()
.env("PATH", std::env::var("PATH").unwrap_or_default())
.env("HOME", std::env::var("HOME").unwrap_or_default())
.output()
.map_err(|e| format!("javac-pool: spawn javac: {e}"))?;
.output();
let output = match compiled {
Ok(o) => o,
Err(e) => {
let _ = std::fs::remove_dir_all(&staging);
return Err(format!("javac-pool: spawn javac: {e}"));
}
};
if !output.status.success() {
let _ = std::fs::remove_dir_all(&staging);
return Err(format!(
"javac-pool: bootstrap compile failed: {}",
String::from_utf8_lossy(&output.stderr),
));
}
let staged_class = staging.join(format!("{WORKER_CLASS}.class"));
let publish = std::fs::rename(&staged_class, &class_path);
let _ = std::fs::remove_dir_all(&staging);
publish.map_err(|e| format!("javac-pool: publish class: {e}"))?;
Ok(())
}
/// True when `class_path` holds a non-empty compiled worker built from
/// the current embedded `WORKER_SOURCE`.
fn worker_class_ready(src_path: &Path, class_path: &Path) -> bool {
if std::fs::read_to_string(src_path).ok().as_deref() != Some(WORKER_SOURCE) {
return false;
}
std::fs::metadata(class_path)
.map(|m| m.is_file() && m.len() > 0)
.unwrap_or(false)
}
/// Cross-process advisory lock guarding the shared bootstrap dir's
/// compile-and-publish step. The held lock file lives at
/// `<dir>/.bootstrap.lock`; the `flock(2)` releases when the guard (and
/// thus the file) drops.
struct BootstrapLock {
_file: File,
}
impl BootstrapLock {
fn acquire(dir: &Path) -> Result<Self, String> {
let lock_path = dir.join(".bootstrap.lock");
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&lock_path)
.map_err(|e| format!("javac-pool: open bootstrap lock: {e}"))?;
lock_file_exclusive(&file).map_err(|e| format!("javac-pool: bootstrap lock: {e}"))?;
Ok(BootstrapLock { _file: file })
}
}
#[cfg(unix)]
fn lock_file_exclusive(file: &File) -> std::io::Result<()> {
use std::os::fd::AsRawFd;
unsafe extern "C" {
fn flock(fd: i32, operation: i32) -> i32;
}
const LOCK_EX: i32 = 2;
loop {
// SAFETY: `file.as_raw_fd()` is a live fd owned by `file`; `flock`
// only reads the scalar args and we check the return value.
let ret = unsafe { flock(file.as_raw_fd(), LOCK_EX) };
if ret == 0 {
return Ok(());
}
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::Interrupted {
continue;
}
return Err(err);
}
}
#[cfg(not(unix))]
fn lock_file_exclusive(_file: &File) -> std::io::Result<()> {
Ok(())
}
@ -515,4 +621,43 @@ mod tests {
assert_eq!(decode_b64(encoded).as_deref(), Some(*raw));
}
}
#[test]
fn worker_class_ready_rejects_truncated_or_mismatched() {
let dir = tempfile::TempDir::new().unwrap();
let src = dir.path().join(WORKER_FILENAME);
let class = dir.path().join(format!("{WORKER_CLASS}.class"));
// Nothing on disk yet.
assert!(!worker_class_ready(&src, &class));
// Matching source but no class.
std::fs::write(&src, WORKER_SOURCE).unwrap();
assert!(!worker_class_ready(&src, &class));
// Matching source + a zero-byte class (the corruption shape a
// racing peer can leave behind) must not count as ready.
std::fs::write(&class, b"").unwrap();
assert!(!worker_class_ready(&src, &class));
// A non-empty class with matching source is ready.
std::fs::write(&class, b"\xca\xfe\xba\xbe").unwrap();
assert!(worker_class_ready(&src, &class));
// Stale source invalidates an otherwise-present class.
std::fs::write(&src, "// not the worker source").unwrap();
assert!(!worker_class_ready(&src, &class));
}
#[test]
fn bootstrap_lock_is_reentrant_across_sequential_acquires() {
// The flock is released when the guard drops, so back-to-back
// acquires from the same process succeed without deadlock.
let dir = tempfile::TempDir::new().unwrap();
{
let _g = BootstrapLock::acquire(dir.path()).expect("first acquire");
}
let _g = BootstrapLock::acquire(dir.path()).expect("second acquire");
assert!(dir.path().join(".bootstrap.lock").exists());
}
}

View file

@ -1388,15 +1388,24 @@ fn try_compile_java_with_toolchain(
if result.success {
return finalize_java_compile(workdir, cache_path, lib_on_cp);
}
if pool.is_healthy() {
// The compile itself failed (real source error) -- surface
// the worker's stderr verbatim.
return Err(result.stderr);
// The pooled compile failed. This is either a genuine source
// error -- which the deterministic direct-spawn `javac` path below
// reproduces identically -- or a transient pool fault: a worker
// crash, a response timeout when the host is saturated, or a
// `NyxJavacWorker.class` corrupted by a concurrent process racing
// on the shared bootstrap dir. The long-lived in-process compiler
// is a fast path, not the oracle for a `BuildFailed` verdict, so
// never surface a pooled failure verbatim -- always fall through
// and re-verify with direct-spawn `javac`. A real error fails
// there too (and we surface its authoritative stderr); a transient
// pool fault is absorbed and the build still succeeds. This is the
// load-bearing fix for flaky `Inconclusive(BuildFailed)` verdicts
// under heavy parallel test load.
if !pool.is_healthy() {
// Worker crashed: evict the cached pool so the next finding
// re-spawns a fresh worker instead of reusing a dead one.
drop_javac_pool(toolchain_id);
}
// Worker crashed: drop the cached pool so the next call
// re-spawns it, then fall through to the legacy direct-spawn
// path so this build still has a chance to succeed.
drop_javac_pool(toolchain_id);
}
let javac = std::env::var("NYX_JAVAC_BIN").unwrap_or_else(|_| "javac".to_owned());