refactor(scan, dynamic): implement cap-routed concurrency lanes for batched verification and prewarmed sandbox baseline directories; enhance handling for streaming pull tasks

This commit is contained in:
elipeter 2026-05-29 11:45:34 -05:00
parent bd76cd5b9d
commit acdc71cd88
9 changed files with 916 additions and 15 deletions

View file

@ -0,0 +1 @@
{"sessionId":"6c158e05-a83e-4808-acf4-12ad7b0fe983","pid":8358,"procStart":"Fri May 29 15:24:35 2026","acquiredAt":1780071990470}

View file

@ -330,8 +330,41 @@ pub(crate) fn verify_findings_for_scan(
}
let telemetry_log = crate::dynamic::telemetry::log_path();
for diag in diags {
let mut result = crate::dynamic::verify::verify_finding(diag, &opts);
// Track P.0: route per-finding verification through cap-keyed concurrency
// lanes so a slow `DESERIALIZE` harness can't head-of-line block fast
// `SSRF` ones. `verify_finding` takes `&Diag`, so the parallel phase is a
// pure read; verdicts are applied back in input order afterwards, keeping
// the verdict sequence identical to the sequential path (determinism
// contract). `NYX_DYNAMIC_VERIFY_PARALLEL=0` forces the legacy loop.
let parallel = std::env::var("NYX_DYNAMIC_VERIFY_PARALLEL")
.map(|v| !matches!(v.trim(), "0" | "false" | "no" | "off"))
.unwrap_or(true);
let results: Vec<crate::dynamic::report::VerifyResult> = if parallel && diags.len() > 1 {
let lane_trace = verbose.then(|| std::sync::Arc::new(crate::dynamic::trace::VerifyTrace::new()));
let out = crate::dynamic::runner::WorkerPool::run_in_lanes(
&*diags,
lane_trace.as_ref(),
|d| {
crate::labels::Cap::from_bits_truncate(
d.evidence.as_ref().map_or(0, |e| e.sink_caps),
)
},
|_, d| crate::dynamic::verify::verify_finding(d, &opts),
);
if let Some(trace) = &lane_trace {
trace.print_to_stderr();
}
out
} else {
diags
.iter()
.map(|d| crate::dynamic::verify::verify_finding(d, &opts))
.collect()
};
for (diag, mut result) in diags.iter_mut().zip(results) {
if result.status == crate::dynamic::report::VerifyStatus::Confirmed
&& let Some(ref log_path) = telemetry_log
{

View file

@ -17,6 +17,7 @@ use crate::dynamic::lang;
use crate::dynamic::spec::HarnessSpec;
use crate::evidence::UnsupportedReason;
use std::fs;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
@ -183,7 +184,7 @@ fn copy_entry_file(spec: &HarnessSpec, workdir: &Path, entry_subpath: Option<&st
let _ = fs::write(&dst, rewritten.as_bytes());
return;
}
let _ = fs::copy(src, &dst);
let _ = copy_workdir(src, &dst);
return;
}
}
@ -247,7 +248,7 @@ fn copy_java_sibling_sources(spec: &HarnessSpec, workdir: &Path) {
continue;
};
if name == "pom.xml" {
let _ = fs::copy(&p, workdir.join(name));
let _ = copy_workdir(&p, &workdir.join(name));
continue;
}
if !p.extension().map(|e| e == "java").unwrap_or(false) {
@ -256,7 +257,7 @@ fn copy_java_sibling_sources(spec: &HarnessSpec, workdir: &Path) {
if name == entry_name || name == alt_name {
continue;
}
let _ = fs::copy(&p, workdir.join(name));
let _ = copy_workdir(&p, &workdir.join(name));
}
}
@ -269,10 +270,10 @@ fn copy_php_project_manifests(spec: &HarnessSpec, workdir: &Path) {
while let Some(current) = dir {
let composer_json = current.join("composer.json");
if composer_json.exists() {
let _ = fs::copy(&composer_json, workdir.join("composer.json"));
let _ = copy_workdir(&composer_json, &workdir.join("composer.json"));
let composer_lock = current.join("composer.lock");
if composer_lock.exists() {
let _ = fs::copy(composer_lock, workdir.join("composer.lock"));
let _ = copy_workdir(&composer_lock, &workdir.join("composer.lock"));
}
return;
}
@ -280,6 +281,176 @@ fn copy_php_project_manifests(spec: &HarnessSpec, workdir: &Path) {
}
}
/// Copy-on-write clone of `src` into `dst` (Track P.0).
///
/// Per-finding workdir staging used to `std::fs::copy` every harness file,
/// paying a full byte copy for each of the 50+ findings an OWASP run touches.
/// On a CoW filesystem the kernel can share the underlying extents instead, so
/// setup cost drops from tens of milliseconds to near zero:
///
/// - **macOS** — `clonefile(2)` clones a file *or an entire directory tree* in
/// a single syscall (the [`clone_dir`] fast path).
/// - **Linux** — `ioctl(FICLONE)` reflinks on btrfs/xfs; `copy_file_range(2)`
/// is the ext4 fallback (in-kernel copy, reflink when the FS supports it).
/// - **Anywhere else / unsupported FS** — falls back to `std::fs::copy`, so
/// behaviour is identical, only slower.
///
/// The top-level `src` is resolved through symlinks (mirroring the `fs::copy`
/// semantics the staging code relied on, so a symlinked entry file copies its
/// target's contents). Symlinks *inside* a cloned tree are preserved verbatim
/// so a baseline snapshot keeps the toolchain's `node_modules/.bin` /
/// `vendor` link structure intact.
pub(crate) fn copy_workdir(src: &Path, dst: &Path) -> io::Result<()> {
let meta = fs::metadata(src)?;
if meta.is_dir() {
clone_dir(src, dst)
} else {
clone_file(src, dst)
}
}
/// Recursively clone a directory tree, preserving internal symlinks.
fn clone_dir(src: &Path, dst: &Path) -> io::Result<()> {
// macOS: `clonefile` clones the whole tree (CoW) in one syscall when the
// destination does not yet exist — the P50 ≤ 5ms baseline-snapshot path.
#[cfg(target_os = "macos")]
if !dst.exists() && clonefile_cow(src, dst).is_ok() {
return Ok(());
}
fs::create_dir_all(dst)?;
for entry in fs::read_dir(src)? {
let entry = entry?;
let from = entry.path();
let to = dst.join(entry.file_name());
let ft = entry.file_type()?;
if ft.is_symlink() {
copy_symlink(&from, &to)?;
} else if ft.is_dir() {
clone_dir(&from, &to)?;
} else {
clone_file(&from, &to)?;
}
}
Ok(())
}
/// CoW-clone a single regular file, falling back to a byte copy.
fn clone_file(src: &Path, dst: &Path) -> io::Result<()> {
#[cfg(target_os = "macos")]
if clonefile_cow(src, dst).is_ok() {
return Ok(());
}
#[cfg(target_os = "linux")]
if reflink_cow(src, dst).is_ok() {
return Ok(());
}
fs::copy(src, dst).map(|_| ())
}
/// Recreate `src` (a symlink) at `dst` rather than following it.
fn copy_symlink(src: &Path, dst: &Path) -> io::Result<()> {
let _ = fs::remove_file(dst);
#[cfg(unix)]
{
let target = fs::read_link(src)?;
std::os::unix::fs::symlink(target, dst)
}
#[cfg(not(unix))]
{
// No portable symlink API: copy the resolved file contents.
clone_file(src, dst)
}
}
/// macOS `clonefile(2)` wrapper. Honours overwrite semantics by removing an
/// existing destination first (`clonefile` fails with `EEXIST` otherwise).
#[cfg(target_os = "macos")]
fn clonefile_cow(src: &Path, dst: &Path) -> io::Result<()> {
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
unsafe extern "C" {
fn clonefile(src: *const i8, dst: *const i8, flags: u32) -> i32;
}
let _ = fs::remove_file(dst);
let csrc = CString::new(src.as_os_str().as_bytes())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
let cdst = CString::new(dst.as_os_str().as_bytes())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
// flags = 0: follow a symlinked `src` and clone its target.
let ret = unsafe { clonefile(csrc.as_ptr(), cdst.as_ptr(), 0) };
if ret == 0 {
Ok(())
} else {
Err(io::Error::last_os_error())
}
}
/// Linux CoW clone: `ioctl(FICLONE)` reflink first, `copy_file_range(2)`
/// fallback. Preserves the source mode so cloned toolchain binaries keep
/// their executable bit.
#[cfg(target_os = "linux")]
fn reflink_cow(src: &Path, dst: &Path) -> io::Result<()> {
use std::os::unix::io::AsRawFd;
// FICLONE = _IOW(0x94, 9, int) on the asm-generic ABI (x86_64, aarch64).
const FICLONE: u64 = 0x4004_9409;
unsafe extern "C" {
fn ioctl(fd: i32, request: u64, ...) -> i32;
fn copy_file_range(
fd_in: i32,
off_in: *mut i64,
fd_out: i32,
off_out: *mut i64,
len: usize,
flags: u32,
) -> isize;
}
let src_file = fs::File::open(src)?;
let meta = src_file.metadata()?;
let dst_file = fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(dst)?;
let src_fd = src_file.as_raw_fd();
let dst_fd = dst_file.as_raw_fd();
// Fast path: whole-file reflink (btrfs/xfs).
let cloned = unsafe { ioctl(dst_fd, FICLONE, src_fd) } == 0;
if !cloned {
// ext4 / overlayfs fallback: in-kernel copy (reflink when supported).
let mut remaining = meta.len() as usize;
while remaining > 0 {
let n = unsafe {
copy_file_range(
src_fd,
std::ptr::null_mut(),
dst_fd,
std::ptr::null_mut(),
remaining,
0,
)
};
if n < 0 {
return Err(io::Error::last_os_error());
}
if n == 0 {
break; // short source / EOF
}
remaining -= n as usize;
}
}
// Neither FICLONE nor copy_file_range copies the mode bits.
fs::set_permissions(dst, meta.permissions())?;
Ok(())
}
/// Extract the source of the entry file (for repro bundles). Best-effort.
fn extract_entry_source(spec: &HarnessSpec) -> String {
let candidates = [
@ -437,4 +608,85 @@ mod tests {
assert!(harness.workdir.join("pom.xml").exists());
assert!(!harness.workdir.join("Benign.java").exists());
}
#[test]
fn copy_workdir_clones_file_contents() {
let tmp = tempfile::TempDir::new().unwrap();
let src = tmp.path().join("src.txt");
let dst = tmp.path().join("dst.txt");
fs::write(&src, b"hello clonefile\n").unwrap();
copy_workdir(&src, &dst).unwrap();
assert_eq!(fs::read(&dst).unwrap(), b"hello clonefile\n");
}
#[test]
fn copy_workdir_overwrites_existing_dest() {
let tmp = tempfile::TempDir::new().unwrap();
let src = tmp.path().join("src.txt");
let dst = tmp.path().join("dst.txt");
fs::write(&src, b"new contents").unwrap();
fs::write(&dst, b"STALE STALE STALE").unwrap();
copy_workdir(&src, &dst).unwrap();
assert_eq!(fs::read(&dst).unwrap(), b"new contents");
}
#[test]
fn copy_workdir_clones_directory_tree() {
let tmp = tempfile::TempDir::new().unwrap();
let src = tmp.path().join("tree");
fs::create_dir_all(src.join("nested")).unwrap();
fs::write(src.join("top.txt"), b"top").unwrap();
fs::write(src.join("nested").join("deep.txt"), b"deep").unwrap();
let dst = tmp.path().join("clone");
copy_workdir(&src, &dst).unwrap();
assert_eq!(fs::read(dst.join("top.txt")).unwrap(), b"top");
assert_eq!(fs::read(dst.join("nested").join("deep.txt")).unwrap(), b"deep");
}
#[cfg(unix)]
#[test]
fn copy_workdir_preserves_internal_symlinks() {
let tmp = tempfile::TempDir::new().unwrap();
let src = tmp.path().join("tree");
fs::create_dir_all(&src).unwrap();
fs::write(src.join("real.txt"), b"real").unwrap();
std::os::unix::fs::symlink("real.txt", src.join("link.txt")).unwrap();
let dst = tmp.path().join("clone");
copy_workdir(&src, &dst).unwrap();
let link = dst.join("link.txt");
assert!(
fs::symlink_metadata(&link).unwrap().file_type().is_symlink(),
"internal symlink must be preserved, not dereferenced"
);
assert_eq!(fs::read(&link).unwrap(), b"real");
}
#[test]
#[ignore = "Phase 24 perf bench: per-finding workdir clone P50 ≤ 5ms (CoW). Opt-in so the default suite stays hermetic + fast. Run: cargo nextest run --features dynamic --run-ignored ignored-only -E 'test(~copy_workdir_perf)'"]
fn copy_workdir_perf_p50_under_5ms() {
use std::time::{Duration, Instant};
let tmp = tempfile::TempDir::new().unwrap();
// Representative harness workdir: entry source + siblings + manifest.
let src = tmp.path().join("src");
fs::create_dir_all(&src).unwrap();
fs::write(src.join("Vuln.java"), "public class Vuln {}\n".repeat(60)).unwrap();
fs::write(src.join("Helper.java"), "class Helper {}\n".repeat(20)).unwrap();
fs::write(src.join("pom.xml"), "<project></project>\n".repeat(30)).unwrap();
let n = 50usize;
let mut samples = Vec::with_capacity(n);
for i in 0..n {
let dst = tmp.path().join(format!("clone{i}"));
let t = Instant::now();
copy_workdir(&src, &dst).unwrap();
samples.push(t.elapsed());
}
samples.sort();
let p50 = samples[n / 2];
eprintln!("phase24 copy_workdir: P50 = {p50:?} over {n} clones");
assert!(
p50 <= Duration::from_millis(5),
"phase24 acceptance gate: workdir clone P50 {p50:?}, expected ≤ 5ms"
);
}
}

View file

@ -20,8 +20,10 @@ use crate::dynamic::spec::HarnessSpec;
use crate::dynamic::stubs::StubEvent;
use crate::dynamic::trace::{TraceStage, VerifyTrace};
use crate::evidence::{DifferentialOutcome, DifferentialVerdict};
use crate::labels::Cap;
use crate::symbol::Lang;
use std::sync::Arc;
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
/// Record a trace event on the caller's [`VerifyTrace`] handle if one
/// was attached to [`SandboxOptions::trace`]. No-op otherwise — keeps
@ -727,6 +729,155 @@ fn generate_nonce() -> String {
format!("{mixed:016x}")
}
/// Per-lane bounded-channel capacity (Track P.0).
///
/// Small on purpose: lanes are backpressure-bounded so a fast feeder cannot
/// queue the whole batch ahead of a slow worker, but large enough that a
/// worker never starves waiting on the feeder for the next item.
const LANE_CHANNEL_CAP: usize = 4;
/// Cap-routed concurrency lanes for batched verification (Track P.0).
///
/// A single-queue verifier lets one slow `DESERIALIZE` harness (JVM spin-up,
/// gadget-chain payloads) head-of-line block a queue full of fast `SSRF`
/// findings. [`WorkerPool::run_in_lanes`] instead routes each finding to a
/// lane keyed by its capability: every cap drains its *own* set of bounded
/// channels with a per-cap worker budget from [`WorkerPool::lanes_for_cap`],
/// and all caps run concurrently, so a slow cap throttles only itself.
///
/// Results are returned in input order regardless of lane scheduling, so the
/// verdict sequence stays deterministic (the engine's determinism contract is
/// about verdicts, not wall-clock interleaving).
pub struct WorkerPool;
impl WorkerPool {
/// Concurrency budget for `cap`'s lanes.
///
/// Verification is dominated by per-harness subprocess wall-time, not CPU,
/// so wide lanes for cheap independent caps (SSRF) pay off even past the
/// core count, while expensive caps stay narrow so one harness can't
/// monopolise the host. Expensive caps are checked first so a combined
/// cap-set inherits the *narrower* lane.
pub fn lanes_for_cap(cap: Cap) -> usize {
if cap.contains(Cap::CRYPTO) {
1
} else if cap.contains(Cap::DESERIALIZE) || cap.contains(Cap::CODE_EXEC) {
2
} else if cap.contains(Cap::SSRF) {
8
} else {
4
}
}
/// Run `work(i, &items[i])` for every item, routed through per-cap lanes.
///
/// `cap_of` extracts the routing capability for each item. Returns one
/// output per input, in input order. Empty / single-item batches run
/// inline (no threads) so trivial scans pay no concurrency overhead.
///
/// `trace`, when present, receives a deterministic
/// [`TraceStage::WorkerLaneAssigned`] event per item (recorded in a
/// single-threaded pre-pass so the trace order does not depend on lane
/// scheduling).
pub fn run_in_lanes<I, O, C, W>(
items: &[I],
trace: Option<&Arc<VerifyTrace>>,
cap_of: C,
work: W,
) -> Vec<O>
where
I: Sync,
O: Send,
C: Fn(&I) -> Cap + Sync,
W: Fn(usize, &I) -> O + Sync,
{
// Group item indices by cap (BTreeMap over the raw bits keeps both the
// pre-pass trace and lane spawning in a stable, reproducible order).
let mut groups: BTreeMap<u32, Vec<usize>> = BTreeMap::new();
for (i, item) in items.iter().enumerate() {
groups.entry(cap_of(item).bits()).or_default().push(i);
}
// Deterministic lane-assignment trace, single-threaded.
if trace.is_some() {
for (bits, idxs) in &groups {
let cap = Cap::from_bits_truncate(*bits);
let lanes = Self::lanes_for_cap(cap).max(1);
for (pos, _) in idxs.iter().enumerate() {
trace_record(
trace,
TraceStage::WorkerLaneAssigned,
Some(format!(
"cap={} lane={}",
crate::labels::cap_to_name(cap),
pos % lanes
)),
);
}
}
}
// Inline fast path: nothing to parallelise.
if items.len() <= 1 {
return items
.iter()
.enumerate()
.map(|(i, it)| work(i, it))
.collect();
}
let results: Vec<Mutex<Option<O>>> =
(0..items.len()).map(|_| Mutex::new(None)).collect();
std::thread::scope(|scope| {
let results = &results;
let work = &work;
for (bits, idxs) in groups {
let cap = Cap::from_bits_truncate(bits);
let lanes = Self::lanes_for_cap(cap).max(1);
// One bounded channel + one worker per lane.
let mut senders = Vec::with_capacity(lanes);
for _ in 0..lanes {
let (tx, rx) = crossbeam_channel::bounded::<usize>(LANE_CHANNEL_CAP);
senders.push(tx);
scope.spawn(move || {
while let Ok(idx) = rx.recv() {
let out = work(idx, &items[idx]);
if let Ok(mut slot) = results[idx].lock() {
*slot = Some(out);
}
}
});
}
// Dedicated feeder per cap so feeding one group never blocks
// another group's workers from starting (cross-cap isolation).
scope.spawn(move || {
for (pos, idx) in idxs.into_iter().enumerate() {
let lane = pos % lanes;
if senders[lane].send(idx).is_err() {
break;
}
}
// `senders` drops here → each lane's rx closes → worker exits.
});
}
});
results
.into_iter()
.map(|m| {
m.into_inner()
.ok()
.flatten()
.expect("every lane worker writes its result slot")
})
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -802,4 +953,87 @@ mod tests {
);
assert!(is_runtime_import_error(&outcome));
}
#[test]
fn lanes_for_cap_matches_table() {
assert_eq!(WorkerPool::lanes_for_cap(Cap::SSRF), 8);
assert_eq!(WorkerPool::lanes_for_cap(Cap::DESERIALIZE), 2);
assert_eq!(WorkerPool::lanes_for_cap(Cap::CODE_EXEC), 2);
assert_eq!(WorkerPool::lanes_for_cap(Cap::CRYPTO), 1);
// Unlisted cap falls back to the default lane width.
assert_eq!(WorkerPool::lanes_for_cap(Cap::SQL_QUERY), 4);
// Expensive cap wins a combined cap-set (narrower lane).
assert_eq!(WorkerPool::lanes_for_cap(Cap::SSRF | Cap::CRYPTO), 1);
}
#[test]
fn run_in_lanes_preserves_input_order() {
// Mixed caps across many items: results must come back indexed by
// input position regardless of which lane finished first.
let caps = [
Cap::SSRF,
Cap::DESERIALIZE,
Cap::CRYPTO,
Cap::SQL_QUERY,
Cap::SSRF,
Cap::CRYPTO,
];
let items: Vec<(usize, Cap)> = caps.iter().copied().enumerate().collect();
let out = WorkerPool::run_in_lanes(
&items,
None,
|&(_, cap)| cap,
|i, &(orig, _)| {
assert_eq!(i, orig);
orig * 10
},
);
assert_eq!(out, vec![0, 10, 20, 30, 40, 50]);
}
#[test]
fn run_in_lanes_runs_every_item_once() {
use std::sync::atomic::{AtomicUsize, Ordering};
let items: Vec<Cap> = (0..64)
.map(|i| match i % 4 {
0 => Cap::SSRF,
1 => Cap::DESERIALIZE,
2 => Cap::CRYPTO,
_ => Cap::SQL_QUERY,
})
.collect();
let calls = AtomicUsize::new(0);
let out = WorkerPool::run_in_lanes(
&items,
None,
|c| *c,
|i, _| {
calls.fetch_add(1, Ordering::Relaxed);
i
},
);
assert_eq!(calls.load(Ordering::Relaxed), 64);
assert_eq!(out, (0..64).collect::<Vec<_>>());
}
#[test]
fn run_in_lanes_emits_deterministic_lane_trace() {
let items = [Cap::SSRF, Cap::CRYPTO, Cap::SSRF];
let trace_a = Arc::new(VerifyTrace::new());
let _ = WorkerPool::run_in_lanes(&items, Some(&trace_a), |c| *c, |i, _| i);
let trace_b = Arc::new(VerifyTrace::new());
let _ = WorkerPool::run_in_lanes(&items, Some(&trace_b), |c| *c, |i, _| i);
let events_a = trace_a.events();
// One WorkerLaneAssigned per item.
assert_eq!(
events_a
.iter()
.filter(|e| e.stage == TraceStage::WorkerLaneAssigned)
.count(),
3
);
// Deterministic across runs.
assert_eq!(trace_a.to_jsonl(), trace_b.to_jsonl());
}
}

View file

@ -0,0 +1,258 @@
//! Prewarmed sandbox baseline directories (Track P.0).
//!
//! A harness needs the language toolchain's heavyweight dependency tree
//! (`node_modules`, `vendor`, `target/`, …) but that tree is identical across
//! every finding in a run — installing it per-finding is the bulk of the
//! per-workdir setup cost. A [`Baseline`] holds one shared, warmed copy under
//! the build-pool cache dir; each per-finding workdir gets a cheap snapshot of
//! it:
//!
//! - **macOS** — a `clonefile` CoW snapshot (via
//! [`crate::dynamic::harness::copy_workdir`]).
//! - **Linux** — a read-only `mount --bind`, falling back to a reflink copy
//! when bind mounts are unavailable (no `CAP_SYS_ADMIN` / not in a mount
//! namespace).
//!
//! The baseline root honours `NYX_BUILD_POOL_DIR` through
//! [`crate::dynamic::build_pool::pool_cache_dir`], so tests can redirect it
//! into a `TempDir` and it shares the same on-disk layout as the Phase 22/23
//! build pools (`<cache>/dynamic/build-pool/<lang>/baseline`).
use crate::symbol::Lang;
use std::fs;
use std::io;
use std::path::{Path, PathBuf};
/// Canonical pinned toolchain subdirectories per language.
///
/// These are the content-addressed dependency trees a harness needs but that
/// never change between findings, so they are warmed once in the shared
/// baseline and snapshotted into each per-finding workdir. Languages whose
/// harnesses carry no pinned tree (C / C++) return an empty slice.
pub fn pinned_subdirs(lang: Lang) -> &'static [&'static str] {
match lang {
Lang::JavaScript | Lang::TypeScript => &["node_modules"],
Lang::Php => &["vendor"],
Lang::Ruby => &["vendor/bundle"],
Lang::Rust => &["target"],
Lang::Go => &["go-pkg"],
Lang::Python => &[".venv"],
Lang::Java => &["lib"],
Lang::C | Lang::Cpp => &[],
}
}
/// Build-pool cache slug for `lang` — matches the Phase 22/23 pool layout so
/// the baseline lives next to its toolchain's pool caches.
fn lang_slug(lang: Lang) -> &'static str {
match lang {
Lang::JavaScript | Lang::TypeScript => "node",
Lang::Python => "python",
Lang::Php => "php",
Lang::Ruby => "ruby",
Lang::Go => "go",
Lang::Rust => "rust",
Lang::Java => "java",
Lang::C => "c",
Lang::Cpp => "cpp",
}
}
/// A shared, prewarmed baseline directory for one language toolchain.
pub struct Baseline {
lang: Lang,
root: PathBuf,
}
impl Baseline {
/// Locate (and create) the shared baseline root for `lang`.
///
/// Returns `None` only when no cache dir is available (neither
/// `NYX_BUILD_POOL_DIR` nor a platform cache dir) — callers then skip the
/// baseline and stage the workdir the legacy way.
pub fn ensure(lang: Lang) -> Option<Self> {
let root = crate::dynamic::build_pool::pool_cache_dir(lang_slug(lang), "baseline")?;
Some(Self { lang, root })
}
/// Root directory holding the warmed pinned subdirs.
pub fn root(&self) -> &Path {
&self.root
}
/// True when at least one pinned subdir is present and non-empty — i.e. a
/// prior `prepare_*` build has warmed the baseline. A cold baseline makes
/// [`Self::snapshot_into`] a no-op so the caller falls back to a normal
/// per-workdir install.
pub fn is_warm(&self) -> bool {
pinned_subdirs(self.lang).iter().any(|sub| {
let p = self.root.join(sub);
p.is_dir()
&& fs::read_dir(&p)
.map(|mut d| d.next().is_some())
.unwrap_or(false)
})
}
/// Snapshot every warmed pinned subdir into `workdir`.
///
/// macOS uses a `clonefile` CoW snapshot; Linux attempts a read-only
/// `mount --bind` and falls back to a reflink copy when bind mounts are
/// unavailable. Missing subdirs are skipped, so a partially warmed
/// baseline still snapshots what it has.
pub fn snapshot_into(&self, workdir: &Path) -> io::Result<()> {
for sub in pinned_subdirs(self.lang) {
let src = self.root.join(sub);
if !src.is_dir() {
continue;
}
let dst = workdir.join(sub);
if let Some(parent) = dst.parent() {
fs::create_dir_all(parent)?;
}
#[cfg(target_os = "linux")]
if bind_mount_ro(&src, &dst).is_ok() {
continue;
}
crate::dynamic::harness::copy_workdir(&src, &dst)?;
}
Ok(())
}
}
/// Read-only `mount --bind src dst` on Linux.
///
/// A bind mount cannot be made read-only in a single call: Linux applies the
/// `MS_RDONLY` flag only on a subsequent `MS_REMOUNT`. A failed remount leaves
/// the read-write bind in place (still far cheaper than a copy), so the harness
/// gets the dependency tree either way; the read-only guarantee is best-effort.
#[cfg(target_os = "linux")]
fn bind_mount_ro(src: &Path, dst: &Path) -> io::Result<()> {
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
unsafe extern "C" {
fn mount(
src: *const core::ffi::c_char,
target: *const core::ffi::c_char,
fstype: *const core::ffi::c_char,
flags: u64,
data: *const core::ffi::c_void,
) -> i32;
}
const MS_RDONLY: u64 = 0x1;
const MS_REMOUNT: u64 = 0x20;
const MS_BIND: u64 = 0x1000;
const MS_REC: u64 = 0x4000;
fs::create_dir_all(dst)?;
let csrc =
CString::new(src.as_os_str().as_bytes()).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
let cdst =
CString::new(dst.as_os_str().as_bytes()).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
let bind = unsafe {
mount(
csrc.as_ptr(),
cdst.as_ptr(),
std::ptr::null(),
MS_BIND | MS_REC,
std::ptr::null(),
)
};
if bind != 0 {
return Err(io::Error::last_os_error());
}
// Best-effort read-only remount; leave the rw bind if it fails.
unsafe {
mount(
std::ptr::null(),
cdst.as_ptr(),
std::ptr::null(),
MS_BIND | MS_REMOUNT | MS_RDONLY | MS_REC,
std::ptr::null(),
)
};
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Mutex, MutexGuard};
static ENV_LOCK: Mutex<()> = Mutex::new(());
struct PoolDirGuard {
_lock: MutexGuard<'static, ()>,
prior: Option<String>,
}
impl PoolDirGuard {
fn set(path: &Path) -> Self {
let lock = ENV_LOCK
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let prior = std::env::var("NYX_BUILD_POOL_DIR").ok();
unsafe { std::env::set_var("NYX_BUILD_POOL_DIR", path) };
Self { _lock: lock, prior }
}
}
impl Drop for PoolDirGuard {
fn drop(&mut self) {
match self.prior.take() {
Some(v) => unsafe { std::env::set_var("NYX_BUILD_POOL_DIR", v) },
None => unsafe { std::env::remove_var("NYX_BUILD_POOL_DIR") },
}
}
}
#[test]
fn pinned_subdirs_cover_dependency_trees() {
assert_eq!(pinned_subdirs(Lang::JavaScript), &["node_modules"]);
assert_eq!(pinned_subdirs(Lang::Php), &["vendor"]);
assert_eq!(pinned_subdirs(Lang::Rust), &["target"]);
assert!(pinned_subdirs(Lang::C).is_empty());
}
#[test]
fn cold_baseline_is_not_warm() {
let tmp = tempfile::TempDir::new().unwrap();
let _g = PoolDirGuard::set(tmp.path());
let baseline = Baseline::ensure(Lang::JavaScript).expect("baseline root");
assert!(!baseline.is_warm(), "empty baseline must be cold");
}
#[test]
fn warm_baseline_snapshots_into_workdir() {
let tmp = tempfile::TempDir::new().unwrap();
let _g = PoolDirGuard::set(tmp.path());
let baseline = Baseline::ensure(Lang::JavaScript).expect("baseline root");
// Warm the baseline: write a fake node_modules tree into the root.
let pkg = baseline.root().join("node_modules").join("left-pad");
fs::create_dir_all(&pkg).unwrap();
fs::write(pkg.join("index.js"), b"module.exports = 1;\n").unwrap();
assert!(baseline.is_warm(), "populated baseline must report warm");
// Snapshot it into a fresh per-finding workdir.
let workdir = tempfile::TempDir::new().unwrap();
baseline.snapshot_into(workdir.path()).unwrap();
let cloned = workdir.path().join("node_modules").join("left-pad").join("index.js");
assert!(cloned.exists(), "snapshot must materialise node_modules");
assert_eq!(fs::read(&cloned).unwrap(), b"module.exports = 1;\n");
}
#[test]
fn snapshot_of_cold_baseline_is_noop() {
let tmp = tempfile::TempDir::new().unwrap();
let _g = PoolDirGuard::set(tmp.path());
let baseline = Baseline::ensure(Lang::Rust).expect("baseline root");
let workdir = tempfile::TempDir::new().unwrap();
// No pinned subdir present → snapshot succeeds and writes nothing.
baseline.snapshot_into(workdir.path()).unwrap();
assert!(!workdir.path().join("target").exists());
}
}

View file

@ -86,6 +86,15 @@ pub enum HardeningRecord {
/// pin when one is available.
pub mod docker;
/// Phase 24 (Track P.0) — prewarmed sandbox baseline directories.
///
/// Holds one shared, warmed copy of each language toolchain's pinned
/// dependency tree (`node_modules`, `vendor`, `target/`, …) and CoW-snapshots
/// (macOS) or read-only bind-mounts (Linux) it into every per-finding workdir,
/// so per-workdir setup cost collapses from a full dependency install to a
/// near-free clone.
pub mod baseline;
// ── Harness interpretation probe ──────────────────────────────────────────────
/// Returns true when the harness is driven by an interpreter (Python, Node, …)

View file

@ -1539,18 +1539,32 @@ async fn handle_pubsub_grpc_connection(
let Ok(mut connection) = h2::server::handshake(stream).await else {
return;
};
// Each request runs as its own task so the accept loop keeps polling the
// h2 connection. The single connection future is the sole driver of socket
// I/O: a StreamingPull handler awaits client frames and flushes responses
// across many turns, so running it inline would starve the driver — the
// queued response could never flush and the client's `response.await`
// would park forever (the pre-fix deadhang).
let mut tasks = Vec::new();
while let Some(request) = connection.accept().await {
let Ok((request, respond)) = request else {
break;
};
let path = request.uri().path().to_owned();
let body = request.into_body();
if path.ends_with("/StreamingPull") {
handle_pubsub_streaming_pull(body, respond, Arc::clone(&state), &log_path).await;
} else {
let body = pubsub_grpc_read_all(body).await;
handle_pubsub_unary(&path, &body, respond, Arc::clone(&state), &log_path).await;
}
let state = Arc::clone(&state);
let log_path = log_path.clone();
tasks.push(tokio::spawn(async move {
if path.ends_with("/StreamingPull") {
handle_pubsub_streaming_pull(body, respond, state, &log_path).await;
} else {
let body = pubsub_grpc_read_all(body).await;
handle_pubsub_unary(&path, &body, respond, state, &log_path).await;
}
}));
}
for task in tasks {
let _ = task.await;
}
}
@ -4307,7 +4321,10 @@ mod tests {
.send_data(bytes::Bytes::from(pubsub_grpc_frame(&init_payload)), false)
.unwrap();
let response = response.await.unwrap();
let response = tokio::time::timeout(Duration::from_secs(2), response)
.await
.expect("streaming pull response headers timed out")
.unwrap();
assert_eq!(response.status(), 200);
let mut body = response.into_body();
let mut response_buffer = Vec::new();

View file

@ -55,6 +55,11 @@ pub enum TraceStage {
OracleWait,
OracleObserved,
Verdict,
/// Track P.0 — the verifier assigned this finding to a cap-routed
/// concurrency lane. `detail` carries `cap=<name> lane=<n>` so a
/// trace consumer can audit how a mixed-cap batch fanned out across
/// lanes without head-of-line blocking.
WorkerLaneAssigned,
}
impl TraceStage {
@ -73,6 +78,7 @@ impl TraceStage {
Self::OracleWait => "oracle_wait",
Self::OracleObserved => "oracle_observed",
Self::Verdict => "verdict",
Self::WorkerLaneAssigned => "worker_lane_assigned",
}
}
}
@ -236,5 +242,9 @@ mod tests {
assert_eq!(TraceStage::OracleWait.as_str(), "oracle_wait");
assert_eq!(TraceStage::OracleObserved.as_str(), "oracle_observed");
assert_eq!(TraceStage::Verdict.as_str(), "verdict");
assert_eq!(
TraceStage::WorkerLaneAssigned.as_str(),
"worker_lane_assigned"
);
}
}

View file

@ -0,0 +1,87 @@
//! Phase 24 / Track P.0 acceptance tests for cap-routed concurrency lanes.
//!
//! The headline gate: a 64-finding mixed-cap batch run through
//! [`WorkerPool::run_in_lanes`] beats a single-lane (one-queue) baseline by
//! ≥ 3×, because a slow `DESERIALIZE` harness can no longer head-of-line
//! block the fast `SSRF` ones — every cap drains its own lanes concurrently.
//!
//! The perf assertion is `#[ignore]` so the default suite stays hermetic and
//! fast; the ordering/correctness check runs by default.
#![cfg(feature = "dynamic")]
use std::time::{Duration, Instant};
use nyx_scanner::dynamic::runner::WorkerPool;
use nyx_scanner::labels::Cap;
/// Realistic OWASP-scale mix: mostly parallelisable `SSRF`, a minority of slow
/// `DESERIALIZE`, and a few single-lane `CRYPTO`.
fn mixed_batch() -> Vec<Cap> {
(0..64)
.map(|i| match i % 8 {
0 => Cap::DESERIALIZE,
1 => Cap::CRYPTO,
_ => Cap::SSRF,
})
.collect()
}
/// Simulated per-finding verify cost: `DESERIALIZE` is the slow JVM/gadget
/// harness; everything else is cheap.
fn simulated_cost(cap: Cap) -> Duration {
if cap.contains(Cap::DESERIALIZE) {
Duration::from_millis(24)
} else {
Duration::from_millis(4)
}
}
#[test]
fn run_in_lanes_preserves_order_and_runs_all() {
let batch = mixed_batch();
let out = WorkerPool::run_in_lanes(&batch, None, |c| *c, |i, _| i * 2);
assert_eq!(out.len(), batch.len());
// Output indexed by input position regardless of lane scheduling.
assert_eq!(out, (0..batch.len()).map(|i| i * 2).collect::<Vec<_>>());
}
#[test]
#[ignore = "Phase 24 perf bench: 64-finding mixed-cap batch ≥ 3× vs single-lane. Opt-in so the default suite stays hermetic + fast. Run: cargo nextest run --features dynamic --run-ignored ignored-only -E 'binary(~workdir_clone)'"]
fn cap_lanes_beat_single_lane_by_3x() {
let batch = mixed_batch();
// Single-lane baseline: one queue, strictly sequential — the pre-P.0
// behaviour where a slow cap blocks the whole batch.
let t0 = Instant::now();
let mut baseline_out = Vec::with_capacity(batch.len());
for (i, c) in batch.iter().enumerate() {
std::thread::sleep(simulated_cost(*c));
baseline_out.push(i);
}
let single_lane = t0.elapsed();
// Cap-routed lanes: every cap runs concurrently with its own worker budget.
let t1 = Instant::now();
let lane_out = WorkerPool::run_in_lanes(
&batch,
None,
|c| *c,
|i, c| {
std::thread::sleep(simulated_cost(*c));
i
},
);
let lanes = t1.elapsed();
assert_eq!(lane_out, baseline_out, "lanes must produce identical ordered results");
let speedup = single_lane.as_secs_f64() / lanes.as_secs_f64();
eprintln!(
"phase24 cap-lanes: single-lane {single_lane:.2?}, cap-lanes {lanes:.2?}, speedup {speedup:.2}×"
);
assert!(
lanes.as_secs_f64() * 3.0 <= single_lane.as_secs_f64(),
"phase24 acceptance gate: expected ≥ 3× speedup, got {speedup:.2}× (single={single_lane:?}, lanes={lanes:?})",
);
}