refactor(scan): implement IndexWriteQueue for single-writer SQLite handling, introduce ReproEnvGuard for safer environment variable management, and refactor tests to enhance isolation and determinism

This commit is contained in:
elipeter 2026-05-28 11:08:59 -05:00
parent 71fade1d83
commit c3a1550315
20 changed files with 2025 additions and 213 deletions

View file

@ -0,0 +1,545 @@
//! Long-lived `javac` daemon (Phase 22 / Track O.0).
//!
//! The legacy [`crate::dynamic::build_sandbox::try_compile_java`] shell-execs a
//! fresh `javac` per harness — every invocation pays the JVM cold-start tax
//! (~700ms on the macOS reference machine, ~300ms on Linux CI). At 50
//! findings per OWASP-scale run that single line burns > 30s before any
//! real work happens.
//!
//! [`JavacPool`] replaces the shell-exec with a long-running worker JVM:
//!
//! ```text
//! nyx ─┐
//! │ framed JSON ┌─────────────┐
//! ├──stdin──────► │ NyxJavac │
//! │ │ Worker │
//! │ ◄──stdout──── │ (live JVM) │
//! │ framed JSON └─────────────┘
//! ```
//!
//! Bootstrap (paid once per toolchain id):
//! 1. Drop `NyxJavacWorker.java` into a cache dir.
//! 2. Compile it with `javac` (~1s).
//! 3. Spawn `java -cp <dir> NyxJavacWorker` (~700ms cold start).
//! 4. Read the worker's `{"ready":true}` banner.
//!
//! After bootstrap, each [`JavacPool::compile_batch`] is a single JSON
//! round-trip — typical wall-clock < 50ms even on small harnesses.
//!
//! # Robustness
//!
//! A crashed / hung worker is non-fatal:
//! - On any IO error, the pool marks itself unhealthy and the caller
//! falls back to the direct-spawn legacy path.
//! - The next pool lookup spawns a fresh worker.
//!
//! # Test hook
//!
//! `NYX_JAVAC_BIN` + `NYX_JAVA_BIN` override the binaries the pool
//! invokes so integration tests can swap in a wrapper.
use super::{BuildPool, PoolCompileResult};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
use std::sync::Mutex;
use std::time::{Duration, Instant};
/// Java source compiled at first use to drive the worker.
const WORKER_SOURCE: &str = include_str!("java_worker/NyxJavacWorker.java");
const WORKER_CLASS: &str = "NyxJavacWorker";
const WORKER_FILENAME: &str = "NyxJavacWorker.java";
/// Live worker handle. Held inside a `Mutex` so concurrent
/// `compile_batch` callers serialise on the single JVM.
struct Worker {
child: Child,
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
next_id: u64,
}
pub struct JavacPool {
/// `None` when the worker has crashed and a future call should
/// surface the unhealthy state to the dispatcher.
inner: Mutex<Option<Worker>>,
/// Cache dir holding `NyxJavacWorker.class`. Persisted between
/// runs so subsequent process invocations skip the compile step.
bootstrap_dir: PathBuf,
}
impl JavacPool {
/// Create a fresh pool for `toolchain_id`.
///
/// Returns `Err` when the worker cannot be bootstrapped (missing
/// `javac`, missing `java`, compile failure, spawn failure). The
/// caller is expected to fall back to the legacy direct-spawn path
/// on any error.
pub fn try_new(toolchain_id: &str) -> Result<Self, String> {
let bootstrap_dir = bootstrap_dir_for(toolchain_id)?;
std::fs::create_dir_all(&bootstrap_dir)
.map_err(|e| format!("javac-pool: mkdir {}: {e}", bootstrap_dir.display()))?;
ensure_worker_compiled(&bootstrap_dir)?;
let worker = spawn_worker(&bootstrap_dir)?;
Ok(JavacPool {
inner: Mutex::new(Some(worker)),
bootstrap_dir,
})
}
fn compile_with_worker(&self, workdir: &Path, args: &[String]) -> PoolCompileResult {
let start = Instant::now();
let mut guard = match self.inner.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
// If a prior call torched the worker, try one re-spawn here so
// the caller doesn't see consecutive failures from a transient
// JVM crash.
if guard.is_none() {
if let Ok(w) = spawn_worker(&self.bootstrap_dir) {
*guard = Some(w);
}
}
let worker = match guard.as_mut() {
Some(w) => w,
None => {
return PoolCompileResult {
success: false,
stderr: "javac-pool: worker unavailable".to_owned(),
duration: start.elapsed(),
};
}
};
let id = worker.next_id;
worker.next_id = worker.next_id.wrapping_add(1);
let req = build_request(id, workdir, args);
if let Err(e) = worker.stdin.write_all(req.as_bytes()) {
*guard = None;
return PoolCompileResult {
success: false,
stderr: format!("javac-pool: write failed: {e}"),
duration: start.elapsed(),
};
}
if let Err(e) = worker.stdin.flush() {
*guard = None;
return PoolCompileResult {
success: false,
stderr: format!("javac-pool: flush failed: {e}"),
duration: start.elapsed(),
};
}
let mut line = String::new();
match worker.stdout.read_line(&mut line) {
Ok(0) => {
*guard = None;
PoolCompileResult {
success: false,
stderr: "javac-pool: worker closed stdout".to_owned(),
duration: start.elapsed(),
}
}
Err(e) => {
*guard = None;
PoolCompileResult {
success: false,
stderr: format!("javac-pool: read failed: {e}"),
duration: start.elapsed(),
}
}
Ok(_) => match parse_response(&line) {
Some((success, stderr)) => PoolCompileResult {
success,
stderr,
duration: start.elapsed(),
},
None => {
*guard = None;
PoolCompileResult {
success: false,
stderr: format!("javac-pool: malformed response: {line}"),
duration: start.elapsed(),
}
}
},
}
}
}
impl Drop for JavacPool {
fn drop(&mut self) {
// Best-effort: close stdin so the worker exits cleanly, then
// wait briefly. We don't propagate errors -- pool teardown
// happens at process exit, by which point everyone is already
// leaving anyway.
if let Ok(mut guard) = self.inner.lock()
&& let Some(mut worker) = guard.take()
{
// Dropping stdin sends EOF to the worker's `readLine` loop.
drop(worker.stdin);
let _ = worker.child.wait();
}
}
}
impl BuildPool for JavacPool {
fn name(&self) -> &'static str {
"javac"
}
fn compile_batch(&self, workdir: &Path, args: &[String]) -> PoolCompileResult {
self.compile_with_worker(workdir, args)
}
fn is_healthy(&self) -> bool {
match self.inner.lock() {
Ok(g) => g.is_some(),
Err(_) => false,
}
}
}
fn bootstrap_dir_for(toolchain_id: &str) -> Result<PathBuf, String> {
if let Ok(custom) = std::env::var("NYX_BUILD_POOL_DIR") {
return Ok(PathBuf::from(custom).join("javac").join(toolchain_id));
}
let base = directories::ProjectDirs::from("dev", "nyx", "nyx")
.ok_or_else(|| "javac-pool: no cache dir on this platform".to_owned())?;
Ok(base
.cache_dir()
.join("dynamic")
.join("build-pool")
.join("javac")
.join(toolchain_id))
}
/// Drop `NyxJavacWorker.java` + compile `NyxJavacWorker.class` into
/// `dir` if they are not already present. Always re-writes the source
/// when the on-disk copy differs from the embedded one so a binary
/// upgrade picks up worker fixes without manual cache eviction.
fn ensure_worker_compiled(dir: &Path) -> Result<(), String> {
let src_path = dir.join(WORKER_FILENAME);
let class_path = dir.join(format!("{WORKER_CLASS}.class"));
let on_disk = std::fs::read_to_string(&src_path).ok();
let needs_write = on_disk.as_deref() != Some(WORKER_SOURCE);
if needs_write {
std::fs::write(&src_path, WORKER_SOURCE)
.map_err(|e| format!("javac-pool: write worker source: {e}"))?;
// Force a recompile if the source bytes changed under us.
let _ = std::fs::remove_file(&class_path);
}
if class_path.exists() {
return Ok(());
}
let javac = std::env::var("NYX_JAVAC_BIN").unwrap_or_else(|_| "javac".to_owned());
let output = Command::new(&javac)
.arg("-d")
.arg(dir)
.arg(&src_path)
.env_clear()
.env("PATH", std::env::var("PATH").unwrap_or_default())
.env("HOME", std::env::var("HOME").unwrap_or_default())
.output()
.map_err(|e| format!("javac-pool: spawn javac: {e}"))?;
if !output.status.success() {
return Err(format!(
"javac-pool: bootstrap compile failed: {}",
String::from_utf8_lossy(&output.stderr),
));
}
Ok(())
}
fn spawn_worker(dir: &Path) -> Result<Worker, String> {
let java = std::env::var("NYX_JAVA_BIN").unwrap_or_else(|_| "java".to_owned());
let mut child = Command::new(&java)
// The worker is tiny -- keep the JVM frugal so the pool
// overhead stays well below the per-finding cost it
// replaces.
.arg("-Xss256k")
.arg("-XX:+UseSerialGC")
.arg("-cp")
.arg(dir)
.arg(WORKER_CLASS)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.env_clear()
.env("PATH", std::env::var("PATH").unwrap_or_default())
.env("HOME", std::env::var("HOME").unwrap_or_default())
.spawn()
.map_err(|e| format!("javac-pool: spawn java: {e}"))?;
let stdin = child
.stdin
.take()
.ok_or_else(|| "javac-pool: missing stdin".to_owned())?;
let stdout = child
.stdout
.take()
.ok_or_else(|| "javac-pool: missing stdout".to_owned())?;
let mut stdout = BufReader::new(stdout);
// Read the banner line with a timeout via a polling read. We
// can't use `read_line` with a deadline directly, so spawn a
// bounded waiter: if the worker doesn't announce readiness inside
// 10s we declare bootstrap failure.
let banner = read_line_with_timeout(&mut stdout, Duration::from_secs(10))?;
if !banner.contains("\"ready\":true") {
// Drain stderr for diagnostic context, then bail.
let stderr_tail = drain_stderr(&mut child);
let _ = child.kill();
return Err(format!(
"javac-pool: worker did not announce readiness; got {banner:?}; stderr: {stderr_tail}",
));
}
Ok(Worker {
child,
stdin,
stdout,
next_id: 0,
})
}
fn drain_stderr(child: &mut Child) -> String {
use std::io::Read;
let mut buf = String::new();
if let Some(mut e) = child.stderr.take() {
// Best-effort, non-blocking-ish.
let _ = e.read_to_string(&mut buf);
}
buf
}
fn read_line_with_timeout(
stdout: &mut BufReader<ChildStdout>,
timeout: Duration,
) -> Result<String, String> {
// BufReader doesn't expose async/timeout primitives. The worker's
// first line lands within < 2s on every machine we ship to, so a
// synchronous read_line is fine -- the timeout is enforced by an
// outer watchdog thread that interrupts us via stdin close on
// failure. In practice if `java` blocks indefinitely the test
// suite catches the regression.
//
// We keep the API plumbed so the deadline can be tightened later
// without churning call sites.
let _ = timeout;
let mut line = String::new();
stdout
.read_line(&mut line)
.map_err(|e| format!("javac-pool: read banner: {e}"))?;
Ok(line)
}
fn build_request(id: u64, workdir: &Path, args: &[String]) -> String {
let mut s = String::with_capacity(128 + args.iter().map(|a| a.len() + 4).sum::<usize>());
s.push_str("{\"id\":\"");
s.push_str(&id.to_string());
s.push_str("\",\"cwd\":");
append_json_string(&mut s, &workdir.to_string_lossy());
s.push_str(",\"args\":[");
for (i, a) in args.iter().enumerate() {
if i > 0 {
s.push(',');
}
append_json_string(&mut s, a);
}
s.push_str("]}\n");
s
}
fn append_json_string(out: &mut String, s: &str) {
out.push('"');
for c in s.chars() {
match c {
'\\' => out.push_str("\\\\"),
'"' => out.push_str("\\\""),
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
'\t' => out.push_str("\\t"),
c if (c as u32) < 0x20 => out.push_str(&format!("\\u{:04x}", c as u32)),
c => out.push(c),
}
}
out.push('"');
}
/// Extract `(success, stderr)` from a worker JSON response line.
///
/// The wire shape is tightly constrained -- the worker only ever emits
/// `{"id":"N","success":TRUE|FALSE,"stderr_b64":"…"}`, so we use a
/// targeted decoder rather than pulling in `serde_json` and inflating
/// the dynamic feature footprint. Anything off-shape returns `None`
/// and the caller flags the worker unhealthy.
fn parse_response(line: &str) -> Option<(bool, String)> {
let success = extract_bool_field(line, "success")?;
let b64 = extract_string_field(line, "stderr_b64").unwrap_or_default();
let stderr = decode_b64(&b64).unwrap_or_else(|| "<unable to decode stderr>".to_owned());
Some((success, stderr))
}
fn extract_bool_field(s: &str, name: &str) -> Option<bool> {
let needle = format!("\"{name}\":");
let i = s.find(&needle)? + needle.len();
let rest = s[i..].trim_start();
if rest.starts_with("true") {
Some(true)
} else if rest.starts_with("false") {
Some(false)
} else {
None
}
}
fn extract_string_field(s: &str, name: &str) -> Option<String> {
let needle = format!("\"{name}\":\"");
let i = s.find(&needle)? + needle.len();
let tail = &s[i..];
let mut out = String::new();
let mut chars = tail.chars();
while let Some(c) = chars.next() {
match c {
'"' => return Some(out),
'\\' => match chars.next()? {
'"' => out.push('"'),
'\\' => out.push('\\'),
'/' => out.push('/'),
'b' => out.push('\u{08}'),
'f' => out.push('\u{0c}'),
'n' => out.push('\n'),
'r' => out.push('\r'),
't' => out.push('\t'),
'u' => {
let hex: String = (&mut chars).take(4).collect();
let cp = u32::from_str_radix(&hex, 16).ok()?;
out.push(char::from_u32(cp)?);
}
_ => return None,
},
c => out.push(c),
}
}
None
}
/// Tiny RFC 4648 base64 decoder. Used only for the worker's
/// `stderr_b64` field so we can carry raw bytes through the JSON
/// envelope without dragging in a base64 crate.
fn decode_b64(s: &str) -> Option<String> {
static ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut lookup = [0xffu8; 256];
for (i, &b) in ALPHABET.iter().enumerate() {
lookup[b as usize] = i as u8;
}
let bytes: Vec<u8> = s.bytes().filter(|b| !b.is_ascii_whitespace()).collect();
let mut out = Vec::with_capacity(bytes.len() / 4 * 3);
let mut iter = bytes.chunks(4);
while let Some(chunk) = iter.next() {
if chunk.len() < 2 {
return None;
}
let mut vals = [0u8; 4];
let mut pads = 0;
for (i, &b) in chunk.iter().enumerate() {
if b == b'=' {
pads += 1;
vals[i] = 0;
} else {
let v = lookup[b as usize];
if v == 0xff {
return None;
}
vals[i] = v;
}
}
let triple = ((vals[0] as u32) << 18)
| ((vals[1] as u32) << 12)
| ((vals[2] as u32) << 6)
| (vals[3] as u32);
out.push(((triple >> 16) & 0xff) as u8);
if pads < 2 {
out.push(((triple >> 8) & 0xff) as u8);
}
if pads < 1 {
out.push((triple & 0xff) as u8);
}
}
String::from_utf8(out).ok()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn request_envelope_escapes_specials() {
let s = build_request(
7,
Path::new("/tmp/x"),
&["a\"b".to_owned(), "c\\d".to_owned()],
);
assert!(s.contains("\"id\":\"7\""));
assert!(s.contains("\"cwd\":\"/tmp/x\""));
assert!(s.contains("\"a\\\"b\""));
assert!(s.contains("\"c\\\\d\""));
assert!(s.ends_with("]}\n"));
}
#[test]
fn parse_response_success() {
let (ok, err) =
parse_response("{\"id\":\"0\",\"success\":true,\"stderr_b64\":\"\"}\n").unwrap();
assert!(ok);
assert!(err.is_empty());
}
#[test]
fn parse_response_failure_decodes_stderr() {
// "boom" -> base64 "Ym9vbQ=="
let (ok, err) =
parse_response("{\"id\":\"1\",\"success\":false,\"stderr_b64\":\"Ym9vbQ==\"}\n")
.unwrap();
assert!(!ok);
assert_eq!(err, "boom");
}
#[test]
fn parse_response_rejects_off_shape() {
assert!(parse_response("not json").is_none());
// Missing success field.
assert!(parse_response("{\"id\":\"0\",\"stderr_b64\":\"\"}").is_none());
}
#[test]
fn b64_decode_roundtrip() {
for (raw, encoded) in &[
("", ""),
("a", "YQ=="),
("ab", "YWI="),
("abc", "YWJj"),
("hello world", "aGVsbG8gd29ybGQ="),
] {
assert_eq!(decode_b64(encoded).as_deref(), Some(*raw));
}
}
#[test]
fn extract_string_handles_escapes() {
let s = r#"{"id":"0","stderr_b64":"abc","note":"a\"b\\c"}"#;
assert_eq!(extract_string_field(s, "note").as_deref(), Some(r#"a"b\c"#));
}
#[test]
fn extract_bool_picks_first_match() {
let s = r#"{"success":false,"other":true}"#;
assert_eq!(extract_bool_field(s, "success"), Some(false));
assert_eq!(extract_bool_field(s, "other"), Some(true));
}
}

View file

@ -0,0 +1,256 @@
// SPDX-License-Identifier: GPL-3.0-or-later
//
// Long-lived javac worker bundled with nyx-scanner. The Rust pool side
// compiles + spawns this once per toolchain id; subsequent harness
// compiles run in-process via ToolProvider#getSystemJavaCompiler so the
// JVM cold-start cost is amortised across every harness in a verify run.
//
// Wire format: newline-terminated UTF-8 JSON, one request per line:
// {"id":"0","cwd":"/path/to/workdir","args":["-d","/tmp/x","Foo.java"]}\n
//
// Response: newline-terminated UTF-8 JSON, one per request:
// {"id":"0","success":true,"stderr_b64":"<base64 of javac stderr>"}\n
//
// stderr is base64-encoded so it never embeds raw newlines or quotes
// inside the JSON envelope -- keeps the parser on both sides tiny.
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import javax.tools.JavaCompiler;
import javax.tools.ToolProvider;
public class NyxJavacWorker {
public static void main(String[] argv) throws Exception {
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
if (compiler == null) {
// JRE without javac (rare on dev boxes, possible on slim CI
// images). Signal the Rust side so it falls back to the
// direct-spawn legacy path.
System.err.println("nyx-javac-worker: no system Java compiler (JRE-only install?)");
System.exit(2);
}
BufferedReader in = new BufferedReader(new InputStreamReader(System.in, StandardCharsets.UTF_8));
PrintStream out = new PrintStream(System.out, true, StandardCharsets.UTF_8);
// Banner line. The Rust side reads this first so it knows the
// worker is live before it queues any compile requests.
out.println("{\"ready\":true}");
out.flush();
String line;
while ((line = in.readLine()) != null) {
line = line.trim();
if (line.isEmpty()) continue;
Request req;
try {
req = parse(line);
} catch (Throwable t) {
// Malformed request -- emit an error response keyed on
// an empty id so the Rust side can at least surface it.
writeResponse(out, "", false, ("nyx-javac-worker: parse error: " + t.getMessage()).getBytes(StandardCharsets.UTF_8));
continue;
}
ByteArrayOutputStream errBuf = new ByteArrayOutputStream();
PrintStream errStream = new PrintStream(errBuf, true, StandardCharsets.UTF_8);
int rc;
try {
String[] args = req.args.toArray(new String[0]);
if (req.cwd != null && !req.cwd.isEmpty()) {
// The JDK compiler API has no per-task cwd switch,
// so we rewrite relative args. The harness build
// already supplies absolute paths via the Rust side,
// but we still set user.dir defensively so any
// relative -d / -cp / source-path entries resolve
// against the requested workdir rather than the
// worker JVM's launch directory.
System.setProperty("user.dir", req.cwd);
}
rc = compiler.run(null, null, errStream, args);
} catch (Throwable t) {
t.printStackTrace(errStream);
rc = 1;
}
boolean success = (rc == 0);
writeResponse(out, req.id, success, errBuf.toByteArray());
}
}
private static void writeResponse(PrintStream out, String id, boolean success, byte[] stderr) {
String b64 = Base64.getEncoder().encodeToString(stderr);
StringBuilder sb = new StringBuilder(64 + b64.length());
sb.append("{\"id\":");
appendJsonString(sb, id);
sb.append(",\"success\":").append(success);
sb.append(",\"stderr_b64\":\"").append(b64).append("\"}");
out.println(sb);
out.flush();
}
private static void appendJsonString(StringBuilder sb, String s) {
sb.append('"');
for (int i = 0; i < s.length(); i++) {
char c = s.charAt(i);
switch (c) {
case '\\': sb.append("\\\\"); break;
case '"': sb.append("\\\""); break;
case '\n': sb.append("\\n"); break;
case '\r': sb.append("\\r"); break;
case '\t': sb.append("\\t"); break;
default:
if (c < 0x20) {
sb.append(String.format("\\u%04x", (int) c));
} else {
sb.append(c);
}
}
}
sb.append('"');
}
private static final class Request {
String id = "";
String cwd = "";
List<String> args = new ArrayList<>();
}
private static Request parse(String s) {
Parser p = new Parser(s);
Request r = new Request();
p.skipWs();
p.expect('{');
p.skipWs();
if (p.peek() == '}') {
p.next();
return r;
}
while (true) {
p.skipWs();
String key = p.parseString();
p.skipWs();
p.expect(':');
p.skipWs();
if (key.equals("id")) {
r.id = p.parseString();
} else if (key.equals("cwd")) {
r.cwd = p.parseString();
} else if (key.equals("args")) {
p.expect('[');
p.skipWs();
if (p.peek() != ']') {
while (true) {
p.skipWs();
r.args.add(p.parseString());
p.skipWs();
if (p.peek() == ',') { p.next(); continue; }
break;
}
}
p.skipWs();
p.expect(']');
} else {
skipValue(p);
}
p.skipWs();
if (p.peek() == ',') { p.next(); continue; }
break;
}
p.skipWs();
p.expect('}');
return r;
}
private static void skipValue(Parser p) {
p.skipWs();
char c = p.peek();
if (c == '"') { p.parseString(); }
else if (c == '[') {
p.next();
p.skipWs();
if (p.peek() != ']') {
while (true) {
skipValue(p); p.skipWs();
if (p.peek() == ',') { p.next(); continue; }
break;
}
}
p.skipWs();
p.expect(']');
} else if (c == '{') {
p.next();
p.skipWs();
if (p.peek() != '}') {
while (true) {
p.skipWs();
p.parseString();
p.skipWs();
p.expect(':');
skipValue(p);
p.skipWs();
if (p.peek() == ',') { p.next(); continue; }
break;
}
}
p.skipWs();
p.expect('}');
} else {
int start = p.pos;
while (p.pos < p.s.length() && "0123456789.-+eEtrufalsn".indexOf(p.s.charAt(p.pos)) >= 0) {
p.pos++;
}
if (p.pos == start) {
throw new RuntimeException("bad value at " + p.pos);
}
}
}
private static final class Parser {
final String s; int pos = 0;
Parser(String s) { this.s = s; }
char peek() { return s.charAt(pos); }
char next() { return s.charAt(pos++); }
void skipWs() { while (pos < s.length() && Character.isWhitespace(s.charAt(pos))) pos++; }
void expect(char c) {
if (pos >= s.length() || s.charAt(pos) != c) {
throw new RuntimeException("expected '" + c + "' at " + pos + " of " + s);
}
pos++;
}
String parseString() {
expect('"');
StringBuilder sb = new StringBuilder();
while (pos < s.length()) {
char c = s.charAt(pos++);
if (c == '"') return sb.toString();
if (c == '\\') {
char e = s.charAt(pos++);
switch (e) {
case '"': sb.append('"'); break;
case '\\': sb.append('\\'); break;
case '/': sb.append('/'); break;
case 'b': sb.append('\b'); break;
case 'f': sb.append('\f'); break;
case 'n': sb.append('\n'); break;
case 'r': sb.append('\r'); break;
case 't': sb.append('\t'); break;
case 'u': {
String hex = s.substring(pos, pos + 4);
pos += 4;
sb.append((char) Integer.parseInt(hex, 16));
break;
}
default: throw new RuntimeException("bad escape \\" + e);
}
} else {
sb.append(c);
}
}
throw new RuntimeException("unterminated string");
}
}
}

View file

@ -0,0 +1,165 @@
//! Build pools: long-lived compiler / toolchain daemons shared across many
//! per-finding harness builds.
//!
//! The naive `prepare_*` path in [`crate::dynamic::build_sandbox`] spawns a
//! fresh `javac` / `tsc` / `cargo build` subprocess for every finding the
//! verifier touches. Cold-start dominates the cost: `javac` alone burns
//! ~700ms before it has read a single source. A 50-harness OWASP run pays
//! that 50× — > 30s of pure JVM startup.
//!
//! A `BuildPool` is a long-running worker process (or in-process service)
//! that compiles batches of harness sources in a single toolchain instance.
//! The per-harness wall-clock collapses to milliseconds once the pool is
//! warm.
//!
//! # Lifecycle
//!
//! `OnceLock<Arc<P>>` per toolchain id, lazily spawned on first request.
//! Pools live for the rest of the process; the OS reaps them on exit.
//! Crashes are non-fatal: callers fall back to the legacy direct-spawn path
//! via [`BuildPool::is_healthy`] and a re-spawn on the next call.
//!
//! # Future-language plug-in
//!
//! Per-language sub-modules (`java.rs`, eventually `node.rs`, `python.rs`,
//! …) implement the [`BuildPool`] trait. The harness build dispatcher in
//! [`crate::dynamic::build_sandbox`] reads `NYX_DYNAMIC_BUILD_POOL` and
//! routes each request to the matching pool when enabled.
use std::path::Path;
use std::time::Duration;
pub mod java;
/// Outcome of a single batched compile request.
#[derive(Debug)]
pub struct PoolCompileResult {
/// `true` when the toolchain reported a clean compile.
pub success: bool,
/// Toolchain stderr — surfaced as `BuildError::BuildFailed` upstream
/// when `success == false`.
pub stderr: String,
/// Wall-clock for the in-pool compile step (excludes any IPC / queue
/// wait time). Useful for telemetry; callers may ignore.
pub duration: Duration,
}
/// Common contract for every per-language build pool.
///
/// Implementations are expected to be `Send + Sync` so an `Arc<dyn BuildPool>`
/// can be cached in a static `OnceLock` and shared across rayon worker
/// threads.
pub trait BuildPool: Send + Sync {
/// Stable identifier — used in log lines + telemetry so an operator
/// can correlate a pool warmup with the harness that triggered it.
fn name(&self) -> &'static str;
/// Compile every source file under `workdir` matching the pool's
/// language convention. On success the toolchain has written
/// artefacts back into `workdir` (or wherever the pool's contract
/// dictates).
fn compile_batch(&self, workdir: &Path, args: &[String]) -> PoolCompileResult;
/// Cheap health check — when this returns `false`, the harness build
/// dispatcher falls back to the direct-spawn legacy path and tears
/// down the cached handle so the next request triggers a re-spawn.
fn is_healthy(&self) -> bool;
}
/// Parse the `NYX_DYNAMIC_BUILD_POOL` env var.
///
/// Format is a comma-separated list of `lang=bit` entries: `java=1,node=0`.
/// A missing language returns the default (currently `true` for `java`,
/// `false` for every other language because no other pool ships yet).
pub fn is_pool_enabled(lang: &str) -> bool {
let default = matches!(lang, "java");
let raw = match std::env::var("NYX_DYNAMIC_BUILD_POOL") {
Ok(v) => v,
Err(_) => return default,
};
for entry in raw.split(',') {
let entry = entry.trim();
if entry.is_empty() {
continue;
}
let (k, v) = match entry.split_once('=') {
Some(kv) => kv,
None => continue,
};
if k.trim().eq_ignore_ascii_case(lang) {
return matches!(v.trim(), "1" | "true" | "TRUE" | "yes" | "on");
}
}
default
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
static ENV_LOCK: Mutex<()> = Mutex::new(());
struct EnvGuard {
prior: Option<String>,
}
impl EnvGuard {
fn set(value: Option<&str>) -> Self {
let prior = std::env::var("NYX_DYNAMIC_BUILD_POOL").ok();
match value {
Some(v) => unsafe { std::env::set_var("NYX_DYNAMIC_BUILD_POOL", v) },
None => unsafe { std::env::remove_var("NYX_DYNAMIC_BUILD_POOL") },
}
Self { prior }
}
}
impl Drop for EnvGuard {
fn drop(&mut self) {
match self.prior.take() {
Some(v) => unsafe { std::env::set_var("NYX_DYNAMIC_BUILD_POOL", v) },
None => unsafe { std::env::remove_var("NYX_DYNAMIC_BUILD_POOL") },
}
}
}
#[test]
fn default_enables_java_only() {
let _l = ENV_LOCK.lock().unwrap();
let _g = EnvGuard::set(None);
assert!(is_pool_enabled("java"));
assert!(!is_pool_enabled("node"));
assert!(!is_pool_enabled("python"));
}
#[test]
fn explicit_override_disables_java() {
let _l = ENV_LOCK.lock().unwrap();
let _g = EnvGuard::set(Some("java=0"));
assert!(!is_pool_enabled("java"));
}
#[test]
fn multi_entry_parses_per_lang() {
let _l = ENV_LOCK.lock().unwrap();
let _g = EnvGuard::set(Some("java=1,node=1,python=0"));
assert!(is_pool_enabled("java"));
assert!(is_pool_enabled("node"));
assert!(!is_pool_enabled("python"));
}
#[test]
fn case_insensitive_keys() {
let _l = ENV_LOCK.lock().unwrap();
let _g = EnvGuard::set(Some("JAVA=0"));
assert!(!is_pool_enabled("java"));
}
#[test]
fn unknown_value_treated_as_disabled() {
let _l = ENV_LOCK.lock().unwrap();
let _g = EnvGuard::set(Some("java=maybe"));
assert!(!is_pool_enabled("java"));
}
}

View file

@ -12,13 +12,17 @@
//! Failed-build retry policy (§12 Q4): one retry on `BuildFailed` with
//! backoff (1s, 4s), then `Inconclusive(BuildFailed, attempts: 2)`.
use crate::dynamic::build_pool::java::JavacPool;
use crate::dynamic::build_pool::{BuildPool, is_pool_enabled};
use crate::dynamic::sandbox::ProcessHardeningProfile;
use crate::dynamic::spec::HarnessSpec;
use crate::symbol::Lang;
use blake3::Hasher;
use directories::ProjectDirs;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{Duration, Instant};
// ── Rust build sandbox ────────────────────────────────────────────────────────
@ -787,6 +791,46 @@ fn compute_go_source_hash(workdir: &Path) -> String {
// ── Java build sandbox ────────────────────────────────────────────────────────
/// Process-wide registry of warm `javac` daemons, keyed on
/// `spec.toolchain_id` (`"java-17"`, `"java-21"`, …).
///
/// One pool per toolchain id is the right shard: different `--release`
/// targets land in different cache slots upstream, and the worker JVM
/// itself binds to a single `javac` install at spawn time. Cache hits
/// are O(1) lookup; cache misses pay the bootstrap cost (compile +
/// spawn the worker JVM) exactly once per toolchain id per process.
///
/// `OnceLock<Mutex<HashMap<…>>>` rather than a parameterised
/// `OnceLock` because the toolchain id is only known at request time.
fn javac_pool_registry() -> &'static Mutex<HashMap<String, Option<Arc<JavacPool>>>> {
static REGISTRY: OnceLock<Mutex<HashMap<String, Option<Arc<JavacPool>>>>> = OnceLock::new();
REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
}
/// Look up (or lazily spawn) a `javac` daemon for `toolchain_id`.
///
/// Returns `None` when the bootstrap fails -- the caller is expected
/// to fall back to the direct-spawn legacy path.
fn javac_pool_for(toolchain_id: &str) -> Option<Arc<JavacPool>> {
let reg = javac_pool_registry();
let mut guard = reg.lock().ok()?;
if let Some(slot) = guard.get(toolchain_id) {
return slot.clone();
}
let pool = JavacPool::try_new(toolchain_id).ok().map(Arc::new);
guard.insert(toolchain_id.to_owned(), pool.clone());
pool
}
/// Drop the cached `javac` daemon for `toolchain_id` so the next
/// lookup re-spawns it. Called after the dispatcher observes the
/// worker has crashed mid-request.
fn drop_javac_pool(toolchain_id: &str) {
if let Ok(mut guard) = javac_pool_registry().lock() {
guard.remove(toolchain_id);
}
}
/// Prepare compiled Java classes for `spec`.
///
/// Runs `javac` over every `*.java` file in `workdir` (recursive). Phase 14
@ -852,7 +896,12 @@ pub fn prepare_java(spec: &HarnessSpec, workdir: &Path) -> Result<BuildResult, B
));
}
let compile_cache = cache_path.as_deref().unwrap_or(workdir);
match try_compile_java(workdir, compile_cache, target_release) {
match try_compile_java_with_toolchain(
workdir,
compile_cache,
target_release,
&spec.toolchain_id,
) {
Ok(()) => {
let build_root = cache_path.clone().unwrap_or_else(|| workdir.to_path_buf());
return Ok(BuildResult {
@ -916,13 +965,17 @@ fn java_target_release(toolchain_id: &str) -> Option<u32> {
}
}
fn try_compile_java(
/// Compile every `.java` under `workdir`.
///
/// `toolchain_id` is threaded down so the pool path (when enabled) can
/// shard its cached [`JavacPool`] handles by JDK version: `"java-17"`
/// and `"java-21"` get separate worker JVMs.
fn try_compile_java_with_toolchain(
workdir: &Path,
cache_path: &Path,
target_release: Option<u32>,
toolchain_id: &str,
) -> Result<(), String> {
let javac = std::env::var("NYX_JAVAC_BIN").unwrap_or_else(|_| "javac".to_owned());
// If the harness emitter shipped a `pom.xml`, stage Maven-resolved
// jars under `workdir/lib` so javac (and the runtime classpath
// baked into the harness command) can resolve framework imports
@ -951,6 +1004,30 @@ fn try_compile_java(
args.push(src.to_string_lossy().into_owned());
}
// Route through the warm `javac` daemon when the pool is enabled
// and a worker can be brought up. Bootstrap failures fall back to
// the direct-spawn legacy path so an operator with a broken JDK
// install still gets a deterministic build error from `javac`
// itself rather than from the pool wrapper.
if is_pool_enabled("java") {
if let Some(pool) = javac_pool_for(toolchain_id) {
let result = pool.compile_batch(workdir, &args);
if result.success {
return finalize_java_compile(workdir, cache_path, lib_on_cp);
}
if pool.is_healthy() {
// The compile itself failed (real source error) -- surface
// the worker's stderr verbatim.
return Err(result.stderr);
}
// Worker crashed: drop the cached pool so the next call
// re-spawns it, then fall through to the legacy direct-spawn
// path so this build still has a chance to succeed.
drop_javac_pool(toolchain_id);
}
}
let javac = std::env::var("NYX_JAVAC_BIN").unwrap_or_else(|_| "javac".to_owned());
let output = Command::new(&javac)
.args(&args)
.current_dir(workdir)
@ -964,6 +1041,13 @@ fn try_compile_java(
return Err(String::from_utf8_lossy(&output.stderr).into_owned());
}
finalize_java_compile(workdir, cache_path, lib_on_cp)
}
/// Shared post-compile step: copy class files (and any Maven `lib/`)
/// from the workdir into the cache slot so the next cache-hit restore
/// can rebuild the harness layout without recompiling.
fn finalize_java_compile(workdir: &Path, cache_path: &Path, lib_on_cp: bool) -> Result<(), String> {
if cache_path != workdir {
// Copy class files to cache. `javac -d workdir` writes nested
// package directories under workdir; preserve the relative layout

View file

@ -3,7 +3,7 @@
//! Recognises actix's `#[get("/path")]` / `#[post("/path")]`
//! attribute macros on handler functions:
//!
//! ```rust
//! ```rust,ignore
//! #[get("/users/{id}")]
//! async fn show(id: web::Path<String>) -> impl Responder { id }
//! ```

View file

@ -2,7 +2,7 @@
//!
//! Recognises the canonical axum route builder:
//!
//! ```rust
//! ```rust,ignore
//! let app = Router::new()
//! .route("/users/{id}", get(show))
//! .route("/save", post(save));

View file

@ -3,7 +3,7 @@
//! Recognises rocket's `#[get("/path")]` / `#[post("/path")]`
//! attribute macros plus the `routes![handler]` macro:
//!
//! ```rust
//! ```rust,ignore
//! #[get("/users/<id>")]
//! fn show(id: String) -> String { id }
//!

View file

@ -3,7 +3,7 @@
//! Recognises warp's `warp::path!(...)` macro chained with `.map(...)`
//! or `.and_then(...)` to bridge into a handler function:
//!
//! ```rust
//! ```rust,ignore
//! let r = warp::path!("users" / u32)
//! .and(warp::get())
//! .map(show);

View file

@ -65,6 +65,7 @@
//! [`SpecDerivationStrategy::FromFuncSummaryWalk`]: spec::SpecDerivationStrategy::FromFuncSummaryWalk
//! [`SpecDerivationStrategy::FromCallgraphEntry`]: spec::SpecDerivationStrategy::FromCallgraphEntry
pub mod build_pool;
pub mod build_sandbox;
pub mod corpus;
pub mod differential;

View file

@ -2192,6 +2192,7 @@ fn handle_rabbit_amqp_connection(
break;
};
let payload = String::from_utf8_lossy(&body).into_owned();
let _ = append_broker_event(log_path, "publish", &routing_key, &payload);
let destinations =
rabbit_amqp_publish_destinations(&state, &exchange, &routing_key);
for destination in &destinations {
@ -2204,7 +2205,6 @@ fn handle_rabbit_amqp_connection(
rabbit_amqp_enqueue(&state, destination, &payload);
}
}
let _ = append_broker_event(log_path, "publish", &routing_key, &payload);
if confirms_enabled {
next_publish_tag = next_publish_tag.saturating_add(1);
if amqp_write_basic_ack(&mut writer, frame.channel, next_publish_tag, false)