This commit is contained in:
elipeter 2026-06-04 10:26:27 -05:00
parent 3edb17e60b
commit 969653735c
2 changed files with 166 additions and 29 deletions

View file

@ -1308,6 +1308,21 @@ fn parse_javac_major(text: &str) -> Option<u32> {
}
}
/// Classify a failed [`PoolCompileResult::stderr`] as a transient pool
/// fault rather than a genuine source error.
///
/// Every non-compile failure path in [`JavacPool::compile_with_worker`]
/// (worker unavailable, write/flush error, closed/timed-out/disconnected
/// pipe, malformed response) prefixes its stderr with `javac-pool:`; a
/// genuine `javac` diagnostic never does. The caller fast-fails genuine
/// errors verbatim and only re-verifies transient faults with a fresh
/// direct-spawn `javac`, so this prefix is the sole, race-free signal
/// (it rides on the per-compile result value, not shared pool state a
/// concurrent verify lane can mutate).
fn javac_pool_failure_is_transient(stderr: &str) -> bool {
stderr.starts_with("javac-pool:")
}
/// Compile every `.java` under `workdir`.
///
/// `toolchain_id` is threaded down so the pool path (when enabled) can
@ -1404,24 +1419,35 @@ fn try_compile_java_with_toolchain(
if result.success {
return finalize_java_compile(workdir, cache_path, lib_on_cp);
}
// The pooled compile failed. This is either a genuine source
// error -- which the deterministic direct-spawn `javac` path below
// reproduces identically -- or a transient pool fault: a worker
// crash, a response timeout when the host is saturated, or a
// `NyxJavacWorker.class` corrupted by a concurrent process racing
// on the shared bootstrap dir. The long-lived in-process compiler
// is a fast path, not the oracle for a `BuildFailed` verdict, so
// never surface a pooled failure verbatim -- always fall through
// and re-verify with direct-spawn `javac`. A real error fails
// there too (and we surface its authoritative stderr); a transient
// pool fault is absorbed and the build still succeeds. This is the
// load-bearing fix for flaky `Inconclusive(BuildFailed)` verdicts
// under heavy parallel test load.
if !pool.is_healthy() {
// Worker crashed: evict the cached pool so the next finding
// re-spawns a fresh worker instead of reusing a dead one.
drop_javac_pool(toolchain_id);
// The pooled compile failed. Classify the failure from *this
// compile's own result* -- never from shared pool state like
// `pool.is_healthy()`, which a concurrent verify lane sharing the
// Arc can re-heal between this failure and the check, misreading a
// transient fault as a genuine error (the race that made the old
// code re-verify *every* failure).
//
// A `javac-pool:` prefix is the Rust-side wrapper for a transient
// pool fault: worker unavailable, a write/flush/read error, a
// response timeout under host saturation, a closed pipe, or a
// malformed response (see `JavacPool::compile_with_worker`). Such
// a failure is not authoritative, so evict the worker and re-verify
// with direct-spawn `javac` below -- a real error fails there too,
// a transient fault is absorbed and the build still succeeds. This
// keeps the load-bearing fix for flaky `Inconclusive(BuildFailed)`
// verdicts under heavy parallel load.
//
// Anything else is the worker's own `javac` diagnostic for a
// genuine source error. Direct-spawn reproduces it identically, so
// surface it verbatim instead of paying a redundant full `javac`
// process spawn -- the dominant verify wall-clock cost across a
// large failing-build corpus (e.g. the OWASP servlet harnesses that
// build-fail in CI), enough to blow the gate's wall-clock budget.
if !javac_pool_failure_is_transient(&result.stderr) {
return Err(result.stderr);
}
// Transient pool fault: evict the cached pool so the next finding
// re-spawns a fresh worker instead of reusing a dead one.
drop_javac_pool(toolchain_id);
}
let javac = std::env::var("NYX_JAVAC_BIN").unwrap_or_else(|_| "javac".to_owned());
@ -2466,6 +2492,39 @@ mod tests {
assert_eq!(java_target_release(""), None);
}
#[test]
fn javac_pool_failure_classifier_separates_transient_from_genuine() {
// Every transient-fault stderr `JavacPool::compile_with_worker`
// emits is `javac-pool:`-prefixed.
for transient in [
"javac-pool: worker unavailable",
"javac-pool: write failed: broken pipe",
"javac-pool: flush failed: broken pipe",
"javac-pool: worker closed stdout",
"javac-pool: read response timed out after 30s",
"javac-pool: read response reader disconnected",
"javac-pool: malformed response: GARBAGE",
] {
assert!(
javac_pool_failure_is_transient(transient),
"must re-verify transient fault via direct-spawn: {transient:?}",
);
}
// A genuine `javac` diagnostic must fast-fail verbatim (no
// redundant direct-spawn), so it is NOT classified transient.
for genuine in [
"Broken.java:1: error: illegal start of expression\n",
"Vuln.java:7: error: package javax.servlet does not exist\n",
"1 error",
"",
] {
assert!(
!javac_pool_failure_is_transient(genuine),
"genuine compile error must fast-fail, not re-spawn javac: {genuine:?}",
);
}
}
#[test]
fn parse_javac_major_handles_version_schemes() {
assert_eq!(parse_javac_major("javac 17.0.9"), Some(17));

View file

@ -903,6 +903,39 @@ fn emit_message_handler(spec: &HarnessSpec, queue: &str, is_typescript: bool) ->
// Nyx dynamic harness — message handler (Phase 20 / Track M.2).
{probe}
// Force synchronous stdout/stderr. The optional real-broker probe below
// drives the AWS SDK, whose HTTP path lazy-instantiates the undici
// `llhttp` WebAssembly module; under the Linux process backend's
// RLIMIT_AS cap that instantiation can throw `RangeError: ... Out of
// memory: Cannot allocate Wasm memory`. Synchronous writes put the
// oracle markers the in-process loopback emits on the pipe the instant
// they are written, so a later fatal crash in that probe can never
// truncate them before `process.exit`.
(function _nyxForceSyncStdio() {{
try {{
const _fs = require('fs');
const _wrap = function (stream, fd) {{
stream.write = function (chunk, enc, cb) {{
if (typeof enc === 'function') {{ cb = enc; enc = undefined; }}
try {{
const buf = Buffer.isBuffer(chunk)
? chunk
: Buffer.from(String(chunk), enc || 'utf8');
_fs.writeSync(fd, buf);
}} catch (_e) {{}}
if (typeof cb === 'function') cb();
return true;
}};
}};
_wrap(process.stdout, 1);
_wrap(process.stderr, 2);
}} catch (_e) {{}}
}})();
// Set once any delivery path has dispatched the handler, so the loopback
// fallback and the crash handlers never double-dispatch or re-confirm.
let _nyxDispatched = false;
{sqs_src}
const payload = (process.env.NYX_PAYLOAD && process.env.NYX_PAYLOAD.length > 0)
@ -998,6 +1031,7 @@ async function _nyxDispatchEnvelope(envelope) {{
// gate requires this byte sequence on stdout / stderr.
process.stdout.write('__NYX_SINK_HIT__\n');
await Promise.resolve(_handler(envelope));
_nyxDispatched = true;
return true;
}} catch (e) {{
process.stderr.write('NYX_EXCEPTION: ' + (e.constructor ? e.constructor.name : 'Error') + ': ' + e.message + '\n');
@ -1005,20 +1039,64 @@ async function _nyxDispatchEnvelope(envelope) {{
}}
}}
(async () => {{
if (await _nyxTryRealSqs({queue:?}, payload)) return;
process.stdout.write({publish_marker:?} + ' ' + {queue:?} + '\n');
_nyxRecordBrokerPublish('NYX_SQS_LOG', {queue:?}, payload);
_broker.publish({queue:?}, payload);
for (const envelope of _broker.receiveMessage({queue:?}, 1)) {{
_nyxRecordBrokerEvent('NYX_SQS_LOG', 'deliver', {queue:?}, envelope.Body || '');
const ok = await _nyxDispatchEnvelope(envelope);
if (ok && _broker.deleteMessage({queue:?}, envelope.ReceiptHandle || '')) {{
_nyxRecordBrokerEvent('NYX_SQS_LOG', 'ack', {queue:?}, envelope.ReceiptHandle || '');
}} else {{
_broker.replayInflight();
// In-process SQS loopback — pure JS, no network or WebAssembly, so it
// runs under any sandbox memory cap. Dispatch is synchronous so it is
// usable from the crash handlers below (where the event loop must not be
// relied on). Idempotent: a no-op once any path has dispatched.
function _nyxLoopbackOnce() {{
if (_nyxDispatched) return;
try {{
process.stdout.write({publish_marker:?} + ' ' + {queue:?} + '\n');
_nyxRecordBrokerPublish('NYX_SQS_LOG', {queue:?}, payload);
_broker.publish({queue:?}, payload);
for (const envelope of _broker.receiveMessage({queue:?}, 1)) {{
_nyxRecordBrokerEvent('NYX_SQS_LOG', 'deliver', {queue:?}, envelope.Body || '');
process.stdout.write('__NYX_SINK_HIT__\n');
try {{
_handler(envelope);
_nyxDispatched = true;
}} catch (e) {{
process.stderr.write('NYX_EXCEPTION: ' + (e && e.constructor ? e.constructor.name : 'Error') + ': ' + (e && e.message ? e.message : String(e)) + '\n');
}}
if (_broker.deleteMessage({queue:?}, envelope.ReceiptHandle || '')) {{
_nyxRecordBrokerEvent('NYX_SQS_LOG', 'ack', {queue:?}, envelope.ReceiptHandle || '');
}} else {{
_broker.replayInflight();
}}
}}
}} catch (e) {{
process.stderr.write('NYX_LOOPBACK_ERROR: ' + (e && e.message ? e.message : String(e)) + '\n');
}}
}}
// The optional real-broker probe drives the AWS SDK, whose undici WASM
// path can throw a fatal `RangeError` under the sandbox RLIMIT_AS cap and
// surface as an unhandled rejection / uncaught exception that would
// otherwise abort the process before the loopback runs. Neutralise it:
// fall back to the in-process loopback (which fully confirms the sink),
// then exit cleanly so the already-synchronous markers are delivered.
process.on('uncaughtException', function (e) {{
process.stderr.write('NYX_UNCAUGHT: ' + (e && e.message ? e.message : String(e)) + '\n');
_nyxLoopbackOnce();
process.exit(0);
}});
process.on('unhandledRejection', function (e) {{
process.stderr.write('NYX_UNHANDLED: ' + (e && e.message ? e.message : String(e)) + '\n');
_nyxLoopbackOnce();
process.exit(0);
}});
(async () => {{
// Prefer the real broker for fidelity, but never let its failure
// (including a fatal undici-WASM OOM) prevent the in-process
// confirmation below.
let real = false;
try {{
real = await _nyxTryRealSqs({queue:?}, payload);
}} catch (e) {{
process.stderr.write('NYX_REAL_SQS_FALLBACK: ' + (e && e.message ? e.message : String(e)) + '\n');
}}
if (!real) _nyxLoopbackOnce();
}})();
"#,
handler = handler,