diff --git a/.claude/scheduled_tasks.lock b/.claude/scheduled_tasks.lock new file mode 100644 index 00000000..9c05338b --- /dev/null +++ b/.claude/scheduled_tasks.lock @@ -0,0 +1 @@ +{"sessionId":"6c158e05-a83e-4808-acf4-12ad7b0fe983","pid":8358,"procStart":"Fri May 29 15:24:35 2026","acquiredAt":1780071990470} \ No newline at end of file diff --git a/src/commands/scan.rs b/src/commands/scan.rs index 6607b9c7..c6a434c7 100644 --- a/src/commands/scan.rs +++ b/src/commands/scan.rs @@ -330,8 +330,41 @@ pub(crate) fn verify_findings_for_scan( } let telemetry_log = crate::dynamic::telemetry::log_path(); - for diag in diags { - let mut result = crate::dynamic::verify::verify_finding(diag, &opts); + + // Track P.0: route per-finding verification through cap-keyed concurrency + // lanes so a slow `DESERIALIZE` harness can't head-of-line block fast + // `SSRF` ones. `verify_finding` takes `&Diag`, so the parallel phase is a + // pure read; verdicts are applied back in input order afterwards, keeping + // the verdict sequence identical to the sequential path (determinism + // contract). `NYX_DYNAMIC_VERIFY_PARALLEL=0` forces the legacy loop. + let parallel = std::env::var("NYX_DYNAMIC_VERIFY_PARALLEL") + .map(|v| !matches!(v.trim(), "0" | "false" | "no" | "off")) + .unwrap_or(true); + + let results: Vec = if parallel && diags.len() > 1 { + let lane_trace = verbose.then(|| std::sync::Arc::new(crate::dynamic::trace::VerifyTrace::new())); + let out = crate::dynamic::runner::WorkerPool::run_in_lanes( + &*diags, + lane_trace.as_ref(), + |d| { + crate::labels::Cap::from_bits_truncate( + d.evidence.as_ref().map_or(0, |e| e.sink_caps), + ) + }, + |_, d| crate::dynamic::verify::verify_finding(d, &opts), + ); + if let Some(trace) = &lane_trace { + trace.print_to_stderr(); + } + out + } else { + diags + .iter() + .map(|d| crate::dynamic::verify::verify_finding(d, &opts)) + .collect() + }; + + for (diag, mut result) in diags.iter_mut().zip(results) { if result.status == crate::dynamic::report::VerifyStatus::Confirmed && let Some(ref log_path) = telemetry_log { diff --git a/src/dynamic/harness.rs b/src/dynamic/harness.rs index da855940..7858226b 100644 --- a/src/dynamic/harness.rs +++ b/src/dynamic/harness.rs @@ -17,6 +17,7 @@ use crate::dynamic::lang; use crate::dynamic::spec::HarnessSpec; use crate::evidence::UnsupportedReason; use std::fs; +use std::io; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; @@ -183,7 +184,7 @@ fn copy_entry_file(spec: &HarnessSpec, workdir: &Path, entry_subpath: Option<&st let _ = fs::write(&dst, rewritten.as_bytes()); return; } - let _ = fs::copy(src, &dst); + let _ = copy_workdir(src, &dst); return; } } @@ -247,7 +248,7 @@ fn copy_java_sibling_sources(spec: &HarnessSpec, workdir: &Path) { continue; }; if name == "pom.xml" { - let _ = fs::copy(&p, workdir.join(name)); + let _ = copy_workdir(&p, &workdir.join(name)); continue; } if !p.extension().map(|e| e == "java").unwrap_or(false) { @@ -256,7 +257,7 @@ fn copy_java_sibling_sources(spec: &HarnessSpec, workdir: &Path) { if name == entry_name || name == alt_name { continue; } - let _ = fs::copy(&p, workdir.join(name)); + let _ = copy_workdir(&p, &workdir.join(name)); } } @@ -269,10 +270,10 @@ fn copy_php_project_manifests(spec: &HarnessSpec, workdir: &Path) { while let Some(current) = dir { let composer_json = current.join("composer.json"); if composer_json.exists() { - let _ = fs::copy(&composer_json, workdir.join("composer.json")); + let _ = copy_workdir(&composer_json, &workdir.join("composer.json")); let composer_lock = current.join("composer.lock"); if composer_lock.exists() { - let _ = fs::copy(composer_lock, workdir.join("composer.lock")); + let _ = copy_workdir(&composer_lock, &workdir.join("composer.lock")); } return; } @@ -280,6 +281,176 @@ fn copy_php_project_manifests(spec: &HarnessSpec, workdir: &Path) { } } +/// Copy-on-write clone of `src` into `dst` (Track P.0). +/// +/// Per-finding workdir staging used to `std::fs::copy` every harness file, +/// paying a full byte copy for each of the 50+ findings an OWASP run touches. +/// On a CoW filesystem the kernel can share the underlying extents instead, so +/// setup cost drops from tens of milliseconds to near zero: +/// +/// - **macOS** — `clonefile(2)` clones a file *or an entire directory tree* in +/// a single syscall (the [`clone_dir`] fast path). +/// - **Linux** — `ioctl(FICLONE)` reflinks on btrfs/xfs; `copy_file_range(2)` +/// is the ext4 fallback (in-kernel copy, reflink when the FS supports it). +/// - **Anywhere else / unsupported FS** — falls back to `std::fs::copy`, so +/// behaviour is identical, only slower. +/// +/// The top-level `src` is resolved through symlinks (mirroring the `fs::copy` +/// semantics the staging code relied on, so a symlinked entry file copies its +/// target's contents). Symlinks *inside* a cloned tree are preserved verbatim +/// so a baseline snapshot keeps the toolchain's `node_modules/.bin` / +/// `vendor` link structure intact. +pub(crate) fn copy_workdir(src: &Path, dst: &Path) -> io::Result<()> { + let meta = fs::metadata(src)?; + if meta.is_dir() { + clone_dir(src, dst) + } else { + clone_file(src, dst) + } +} + +/// Recursively clone a directory tree, preserving internal symlinks. +fn clone_dir(src: &Path, dst: &Path) -> io::Result<()> { + // macOS: `clonefile` clones the whole tree (CoW) in one syscall when the + // destination does not yet exist — the P50 ≤ 5ms baseline-snapshot path. + #[cfg(target_os = "macos")] + if !dst.exists() && clonefile_cow(src, dst).is_ok() { + return Ok(()); + } + fs::create_dir_all(dst)?; + for entry in fs::read_dir(src)? { + let entry = entry?; + let from = entry.path(); + let to = dst.join(entry.file_name()); + let ft = entry.file_type()?; + if ft.is_symlink() { + copy_symlink(&from, &to)?; + } else if ft.is_dir() { + clone_dir(&from, &to)?; + } else { + clone_file(&from, &to)?; + } + } + Ok(()) +} + +/// CoW-clone a single regular file, falling back to a byte copy. +fn clone_file(src: &Path, dst: &Path) -> io::Result<()> { + #[cfg(target_os = "macos")] + if clonefile_cow(src, dst).is_ok() { + return Ok(()); + } + #[cfg(target_os = "linux")] + if reflink_cow(src, dst).is_ok() { + return Ok(()); + } + fs::copy(src, dst).map(|_| ()) +} + +/// Recreate `src` (a symlink) at `dst` rather than following it. +fn copy_symlink(src: &Path, dst: &Path) -> io::Result<()> { + let _ = fs::remove_file(dst); + #[cfg(unix)] + { + let target = fs::read_link(src)?; + std::os::unix::fs::symlink(target, dst) + } + #[cfg(not(unix))] + { + // No portable symlink API: copy the resolved file contents. + clone_file(src, dst) + } +} + +/// macOS `clonefile(2)` wrapper. Honours overwrite semantics by removing an +/// existing destination first (`clonefile` fails with `EEXIST` otherwise). +#[cfg(target_os = "macos")] +fn clonefile_cow(src: &Path, dst: &Path) -> io::Result<()> { + use std::ffi::CString; + use std::os::unix::ffi::OsStrExt; + + unsafe extern "C" { + fn clonefile(src: *const i8, dst: *const i8, flags: u32) -> i32; + } + + let _ = fs::remove_file(dst); + let csrc = CString::new(src.as_os_str().as_bytes()) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + let cdst = CString::new(dst.as_os_str().as_bytes()) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + // flags = 0: follow a symlinked `src` and clone its target. + let ret = unsafe { clonefile(csrc.as_ptr(), cdst.as_ptr(), 0) }; + if ret == 0 { + Ok(()) + } else { + Err(io::Error::last_os_error()) + } +} + +/// Linux CoW clone: `ioctl(FICLONE)` reflink first, `copy_file_range(2)` +/// fallback. Preserves the source mode so cloned toolchain binaries keep +/// their executable bit. +#[cfg(target_os = "linux")] +fn reflink_cow(src: &Path, dst: &Path) -> io::Result<()> { + use std::os::unix::io::AsRawFd; + + // FICLONE = _IOW(0x94, 9, int) on the asm-generic ABI (x86_64, aarch64). + const FICLONE: u64 = 0x4004_9409; + + unsafe extern "C" { + fn ioctl(fd: i32, request: u64, ...) -> i32; + fn copy_file_range( + fd_in: i32, + off_in: *mut i64, + fd_out: i32, + off_out: *mut i64, + len: usize, + flags: u32, + ) -> isize; + } + + let src_file = fs::File::open(src)?; + let meta = src_file.metadata()?; + let dst_file = fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(dst)?; + + let src_fd = src_file.as_raw_fd(); + let dst_fd = dst_file.as_raw_fd(); + + // Fast path: whole-file reflink (btrfs/xfs). + let cloned = unsafe { ioctl(dst_fd, FICLONE, src_fd) } == 0; + if !cloned { + // ext4 / overlayfs fallback: in-kernel copy (reflink when supported). + let mut remaining = meta.len() as usize; + while remaining > 0 { + let n = unsafe { + copy_file_range( + src_fd, + std::ptr::null_mut(), + dst_fd, + std::ptr::null_mut(), + remaining, + 0, + ) + }; + if n < 0 { + return Err(io::Error::last_os_error()); + } + if n == 0 { + break; // short source / EOF + } + remaining -= n as usize; + } + } + + // Neither FICLONE nor copy_file_range copies the mode bits. + fs::set_permissions(dst, meta.permissions())?; + Ok(()) +} + /// Extract the source of the entry file (for repro bundles). Best-effort. fn extract_entry_source(spec: &HarnessSpec) -> String { let candidates = [ @@ -437,4 +608,85 @@ mod tests { assert!(harness.workdir.join("pom.xml").exists()); assert!(!harness.workdir.join("Benign.java").exists()); } + + #[test] + fn copy_workdir_clones_file_contents() { + let tmp = tempfile::TempDir::new().unwrap(); + let src = tmp.path().join("src.txt"); + let dst = tmp.path().join("dst.txt"); + fs::write(&src, b"hello clonefile\n").unwrap(); + copy_workdir(&src, &dst).unwrap(); + assert_eq!(fs::read(&dst).unwrap(), b"hello clonefile\n"); + } + + #[test] + fn copy_workdir_overwrites_existing_dest() { + let tmp = tempfile::TempDir::new().unwrap(); + let src = tmp.path().join("src.txt"); + let dst = tmp.path().join("dst.txt"); + fs::write(&src, b"new contents").unwrap(); + fs::write(&dst, b"STALE STALE STALE").unwrap(); + copy_workdir(&src, &dst).unwrap(); + assert_eq!(fs::read(&dst).unwrap(), b"new contents"); + } + + #[test] + fn copy_workdir_clones_directory_tree() { + let tmp = tempfile::TempDir::new().unwrap(); + let src = tmp.path().join("tree"); + fs::create_dir_all(src.join("nested")).unwrap(); + fs::write(src.join("top.txt"), b"top").unwrap(); + fs::write(src.join("nested").join("deep.txt"), b"deep").unwrap(); + let dst = tmp.path().join("clone"); + copy_workdir(&src, &dst).unwrap(); + assert_eq!(fs::read(dst.join("top.txt")).unwrap(), b"top"); + assert_eq!(fs::read(dst.join("nested").join("deep.txt")).unwrap(), b"deep"); + } + + #[cfg(unix)] + #[test] + fn copy_workdir_preserves_internal_symlinks() { + let tmp = tempfile::TempDir::new().unwrap(); + let src = tmp.path().join("tree"); + fs::create_dir_all(&src).unwrap(); + fs::write(src.join("real.txt"), b"real").unwrap(); + std::os::unix::fs::symlink("real.txt", src.join("link.txt")).unwrap(); + let dst = tmp.path().join("clone"); + copy_workdir(&src, &dst).unwrap(); + let link = dst.join("link.txt"); + assert!( + fs::symlink_metadata(&link).unwrap().file_type().is_symlink(), + "internal symlink must be preserved, not dereferenced" + ); + assert_eq!(fs::read(&link).unwrap(), b"real"); + } + + #[test] + #[ignore = "Phase 24 perf bench: per-finding workdir clone P50 ≤ 5ms (CoW). Opt-in so the default suite stays hermetic + fast. Run: cargo nextest run --features dynamic --run-ignored ignored-only -E 'test(~copy_workdir_perf)'"] + fn copy_workdir_perf_p50_under_5ms() { + use std::time::{Duration, Instant}; + let tmp = tempfile::TempDir::new().unwrap(); + // Representative harness workdir: entry source + siblings + manifest. + let src = tmp.path().join("src"); + fs::create_dir_all(&src).unwrap(); + fs::write(src.join("Vuln.java"), "public class Vuln {}\n".repeat(60)).unwrap(); + fs::write(src.join("Helper.java"), "class Helper {}\n".repeat(20)).unwrap(); + fs::write(src.join("pom.xml"), "\n".repeat(30)).unwrap(); + + let n = 50usize; + let mut samples = Vec::with_capacity(n); + for i in 0..n { + let dst = tmp.path().join(format!("clone{i}")); + let t = Instant::now(); + copy_workdir(&src, &dst).unwrap(); + samples.push(t.elapsed()); + } + samples.sort(); + let p50 = samples[n / 2]; + eprintln!("phase24 copy_workdir: P50 = {p50:?} over {n} clones"); + assert!( + p50 <= Duration::from_millis(5), + "phase24 acceptance gate: workdir clone P50 {p50:?}, expected ≤ 5ms" + ); + } } diff --git a/src/dynamic/runner.rs b/src/dynamic/runner.rs index 20a37988..67c5f659 100644 --- a/src/dynamic/runner.rs +++ b/src/dynamic/runner.rs @@ -20,8 +20,10 @@ use crate::dynamic::spec::HarnessSpec; use crate::dynamic::stubs::StubEvent; use crate::dynamic::trace::{TraceStage, VerifyTrace}; use crate::evidence::{DifferentialOutcome, DifferentialVerdict}; +use crate::labels::Cap; use crate::symbol::Lang; -use std::sync::Arc; +use std::collections::BTreeMap; +use std::sync::{Arc, Mutex}; /// Record a trace event on the caller's [`VerifyTrace`] handle if one /// was attached to [`SandboxOptions::trace`]. No-op otherwise — keeps @@ -727,6 +729,155 @@ fn generate_nonce() -> String { format!("{mixed:016x}") } +/// Per-lane bounded-channel capacity (Track P.0). +/// +/// Small on purpose: lanes are backpressure-bounded so a fast feeder cannot +/// queue the whole batch ahead of a slow worker, but large enough that a +/// worker never starves waiting on the feeder for the next item. +const LANE_CHANNEL_CAP: usize = 4; + +/// Cap-routed concurrency lanes for batched verification (Track P.0). +/// +/// A single-queue verifier lets one slow `DESERIALIZE` harness (JVM spin-up, +/// gadget-chain payloads) head-of-line block a queue full of fast `SSRF` +/// findings. [`WorkerPool::run_in_lanes`] instead routes each finding to a +/// lane keyed by its capability: every cap drains its *own* set of bounded +/// channels with a per-cap worker budget from [`WorkerPool::lanes_for_cap`], +/// and all caps run concurrently, so a slow cap throttles only itself. +/// +/// Results are returned in input order regardless of lane scheduling, so the +/// verdict sequence stays deterministic (the engine's determinism contract is +/// about verdicts, not wall-clock interleaving). +pub struct WorkerPool; + +impl WorkerPool { + /// Concurrency budget for `cap`'s lanes. + /// + /// Verification is dominated by per-harness subprocess wall-time, not CPU, + /// so wide lanes for cheap independent caps (SSRF) pay off even past the + /// core count, while expensive caps stay narrow so one harness can't + /// monopolise the host. Expensive caps are checked first so a combined + /// cap-set inherits the *narrower* lane. + pub fn lanes_for_cap(cap: Cap) -> usize { + if cap.contains(Cap::CRYPTO) { + 1 + } else if cap.contains(Cap::DESERIALIZE) || cap.contains(Cap::CODE_EXEC) { + 2 + } else if cap.contains(Cap::SSRF) { + 8 + } else { + 4 + } + } + + /// Run `work(i, &items[i])` for every item, routed through per-cap lanes. + /// + /// `cap_of` extracts the routing capability for each item. Returns one + /// output per input, in input order. Empty / single-item batches run + /// inline (no threads) so trivial scans pay no concurrency overhead. + /// + /// `trace`, when present, receives a deterministic + /// [`TraceStage::WorkerLaneAssigned`] event per item (recorded in a + /// single-threaded pre-pass so the trace order does not depend on lane + /// scheduling). + pub fn run_in_lanes( + items: &[I], + trace: Option<&Arc>, + cap_of: C, + work: W, + ) -> Vec + where + I: Sync, + O: Send, + C: Fn(&I) -> Cap + Sync, + W: Fn(usize, &I) -> O + Sync, + { + // Group item indices by cap (BTreeMap over the raw bits keeps both the + // pre-pass trace and lane spawning in a stable, reproducible order). + let mut groups: BTreeMap> = BTreeMap::new(); + for (i, item) in items.iter().enumerate() { + groups.entry(cap_of(item).bits()).or_default().push(i); + } + + // Deterministic lane-assignment trace, single-threaded. + if trace.is_some() { + for (bits, idxs) in &groups { + let cap = Cap::from_bits_truncate(*bits); + let lanes = Self::lanes_for_cap(cap).max(1); + for (pos, _) in idxs.iter().enumerate() { + trace_record( + trace, + TraceStage::WorkerLaneAssigned, + Some(format!( + "cap={} lane={}", + crate::labels::cap_to_name(cap), + pos % lanes + )), + ); + } + } + } + + // Inline fast path: nothing to parallelise. + if items.len() <= 1 { + return items + .iter() + .enumerate() + .map(|(i, it)| work(i, it)) + .collect(); + } + + let results: Vec>> = + (0..items.len()).map(|_| Mutex::new(None)).collect(); + + std::thread::scope(|scope| { + let results = &results; + let work = &work; + for (bits, idxs) in groups { + let cap = Cap::from_bits_truncate(bits); + let lanes = Self::lanes_for_cap(cap).max(1); + + // One bounded channel + one worker per lane. + let mut senders = Vec::with_capacity(lanes); + for _ in 0..lanes { + let (tx, rx) = crossbeam_channel::bounded::(LANE_CHANNEL_CAP); + senders.push(tx); + scope.spawn(move || { + while let Ok(idx) = rx.recv() { + let out = work(idx, &items[idx]); + if let Ok(mut slot) = results[idx].lock() { + *slot = Some(out); + } + } + }); + } + + // Dedicated feeder per cap so feeding one group never blocks + // another group's workers from starting (cross-cap isolation). + scope.spawn(move || { + for (pos, idx) in idxs.into_iter().enumerate() { + let lane = pos % lanes; + if senders[lane].send(idx).is_err() { + break; + } + } + // `senders` drops here → each lane's rx closes → worker exits. + }); + } + }); + + results + .into_iter() + .map(|m| { + m.into_inner() + .ok() + .flatten() + .expect("every lane worker writes its result slot") + }) + .collect() + } +} + #[cfg(test)] mod tests { use super::*; @@ -802,4 +953,87 @@ mod tests { ); assert!(is_runtime_import_error(&outcome)); } + + #[test] + fn lanes_for_cap_matches_table() { + assert_eq!(WorkerPool::lanes_for_cap(Cap::SSRF), 8); + assert_eq!(WorkerPool::lanes_for_cap(Cap::DESERIALIZE), 2); + assert_eq!(WorkerPool::lanes_for_cap(Cap::CODE_EXEC), 2); + assert_eq!(WorkerPool::lanes_for_cap(Cap::CRYPTO), 1); + // Unlisted cap falls back to the default lane width. + assert_eq!(WorkerPool::lanes_for_cap(Cap::SQL_QUERY), 4); + // Expensive cap wins a combined cap-set (narrower lane). + assert_eq!(WorkerPool::lanes_for_cap(Cap::SSRF | Cap::CRYPTO), 1); + } + + #[test] + fn run_in_lanes_preserves_input_order() { + // Mixed caps across many items: results must come back indexed by + // input position regardless of which lane finished first. + let caps = [ + Cap::SSRF, + Cap::DESERIALIZE, + Cap::CRYPTO, + Cap::SQL_QUERY, + Cap::SSRF, + Cap::CRYPTO, + ]; + let items: Vec<(usize, Cap)> = caps.iter().copied().enumerate().collect(); + let out = WorkerPool::run_in_lanes( + &items, + None, + |&(_, cap)| cap, + |i, &(orig, _)| { + assert_eq!(i, orig); + orig * 10 + }, + ); + assert_eq!(out, vec![0, 10, 20, 30, 40, 50]); + } + + #[test] + fn run_in_lanes_runs_every_item_once() { + use std::sync::atomic::{AtomicUsize, Ordering}; + let items: Vec = (0..64) + .map(|i| match i % 4 { + 0 => Cap::SSRF, + 1 => Cap::DESERIALIZE, + 2 => Cap::CRYPTO, + _ => Cap::SQL_QUERY, + }) + .collect(); + let calls = AtomicUsize::new(0); + let out = WorkerPool::run_in_lanes( + &items, + None, + |c| *c, + |i, _| { + calls.fetch_add(1, Ordering::Relaxed); + i + }, + ); + assert_eq!(calls.load(Ordering::Relaxed), 64); + assert_eq!(out, (0..64).collect::>()); + } + + #[test] + fn run_in_lanes_emits_deterministic_lane_trace() { + let items = [Cap::SSRF, Cap::CRYPTO, Cap::SSRF]; + let trace_a = Arc::new(VerifyTrace::new()); + let _ = WorkerPool::run_in_lanes(&items, Some(&trace_a), |c| *c, |i, _| i); + let trace_b = Arc::new(VerifyTrace::new()); + let _ = WorkerPool::run_in_lanes(&items, Some(&trace_b), |c| *c, |i, _| i); + + let events_a = trace_a.events(); + // One WorkerLaneAssigned per item. + assert_eq!( + events_a + .iter() + .filter(|e| e.stage == TraceStage::WorkerLaneAssigned) + .count(), + 3 + ); + // Deterministic across runs. + assert_eq!(trace_a.to_jsonl(), trace_b.to_jsonl()); + } } diff --git a/src/dynamic/sandbox/baseline.rs b/src/dynamic/sandbox/baseline.rs new file mode 100644 index 00000000..801dda7e --- /dev/null +++ b/src/dynamic/sandbox/baseline.rs @@ -0,0 +1,258 @@ +//! Prewarmed sandbox baseline directories (Track P.0). +//! +//! A harness needs the language toolchain's heavyweight dependency tree +//! (`node_modules`, `vendor`, `target/`, …) but that tree is identical across +//! every finding in a run — installing it per-finding is the bulk of the +//! per-workdir setup cost. A [`Baseline`] holds one shared, warmed copy under +//! the build-pool cache dir; each per-finding workdir gets a cheap snapshot of +//! it: +//! +//! - **macOS** — a `clonefile` CoW snapshot (via +//! [`crate::dynamic::harness::copy_workdir`]). +//! - **Linux** — a read-only `mount --bind`, falling back to a reflink copy +//! when bind mounts are unavailable (no `CAP_SYS_ADMIN` / not in a mount +//! namespace). +//! +//! The baseline root honours `NYX_BUILD_POOL_DIR` through +//! [`crate::dynamic::build_pool::pool_cache_dir`], so tests can redirect it +//! into a `TempDir` and it shares the same on-disk layout as the Phase 22/23 +//! build pools (`/dynamic/build-pool//baseline`). + +use crate::symbol::Lang; +use std::fs; +use std::io; +use std::path::{Path, PathBuf}; + +/// Canonical pinned toolchain subdirectories per language. +/// +/// These are the content-addressed dependency trees a harness needs but that +/// never change between findings, so they are warmed once in the shared +/// baseline and snapshotted into each per-finding workdir. Languages whose +/// harnesses carry no pinned tree (C / C++) return an empty slice. +pub fn pinned_subdirs(lang: Lang) -> &'static [&'static str] { + match lang { + Lang::JavaScript | Lang::TypeScript => &["node_modules"], + Lang::Php => &["vendor"], + Lang::Ruby => &["vendor/bundle"], + Lang::Rust => &["target"], + Lang::Go => &["go-pkg"], + Lang::Python => &[".venv"], + Lang::Java => &["lib"], + Lang::C | Lang::Cpp => &[], + } +} + +/// Build-pool cache slug for `lang` — matches the Phase 22/23 pool layout so +/// the baseline lives next to its toolchain's pool caches. +fn lang_slug(lang: Lang) -> &'static str { + match lang { + Lang::JavaScript | Lang::TypeScript => "node", + Lang::Python => "python", + Lang::Php => "php", + Lang::Ruby => "ruby", + Lang::Go => "go", + Lang::Rust => "rust", + Lang::Java => "java", + Lang::C => "c", + Lang::Cpp => "cpp", + } +} + +/// A shared, prewarmed baseline directory for one language toolchain. +pub struct Baseline { + lang: Lang, + root: PathBuf, +} + +impl Baseline { + /// Locate (and create) the shared baseline root for `lang`. + /// + /// Returns `None` only when no cache dir is available (neither + /// `NYX_BUILD_POOL_DIR` nor a platform cache dir) — callers then skip the + /// baseline and stage the workdir the legacy way. + pub fn ensure(lang: Lang) -> Option { + let root = crate::dynamic::build_pool::pool_cache_dir(lang_slug(lang), "baseline")?; + Some(Self { lang, root }) + } + + /// Root directory holding the warmed pinned subdirs. + pub fn root(&self) -> &Path { + &self.root + } + + /// True when at least one pinned subdir is present and non-empty — i.e. a + /// prior `prepare_*` build has warmed the baseline. A cold baseline makes + /// [`Self::snapshot_into`] a no-op so the caller falls back to a normal + /// per-workdir install. + pub fn is_warm(&self) -> bool { + pinned_subdirs(self.lang).iter().any(|sub| { + let p = self.root.join(sub); + p.is_dir() + && fs::read_dir(&p) + .map(|mut d| d.next().is_some()) + .unwrap_or(false) + }) + } + + /// Snapshot every warmed pinned subdir into `workdir`. + /// + /// macOS uses a `clonefile` CoW snapshot; Linux attempts a read-only + /// `mount --bind` and falls back to a reflink copy when bind mounts are + /// unavailable. Missing subdirs are skipped, so a partially warmed + /// baseline still snapshots what it has. + pub fn snapshot_into(&self, workdir: &Path) -> io::Result<()> { + for sub in pinned_subdirs(self.lang) { + let src = self.root.join(sub); + if !src.is_dir() { + continue; + } + let dst = workdir.join(sub); + if let Some(parent) = dst.parent() { + fs::create_dir_all(parent)?; + } + #[cfg(target_os = "linux")] + if bind_mount_ro(&src, &dst).is_ok() { + continue; + } + crate::dynamic::harness::copy_workdir(&src, &dst)?; + } + Ok(()) + } +} + +/// Read-only `mount --bind src dst` on Linux. +/// +/// A bind mount cannot be made read-only in a single call: Linux applies the +/// `MS_RDONLY` flag only on a subsequent `MS_REMOUNT`. A failed remount leaves +/// the read-write bind in place (still far cheaper than a copy), so the harness +/// gets the dependency tree either way; the read-only guarantee is best-effort. +#[cfg(target_os = "linux")] +fn bind_mount_ro(src: &Path, dst: &Path) -> io::Result<()> { + use std::ffi::CString; + use std::os::unix::ffi::OsStrExt; + + unsafe extern "C" { + fn mount( + src: *const core::ffi::c_char, + target: *const core::ffi::c_char, + fstype: *const core::ffi::c_char, + flags: u64, + data: *const core::ffi::c_void, + ) -> i32; + } + + const MS_RDONLY: u64 = 0x1; + const MS_REMOUNT: u64 = 0x20; + const MS_BIND: u64 = 0x1000; + const MS_REC: u64 = 0x4000; + + fs::create_dir_all(dst)?; + let csrc = + CString::new(src.as_os_str().as_bytes()).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + let cdst = + CString::new(dst.as_os_str().as_bytes()).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + + let bind = unsafe { + mount( + csrc.as_ptr(), + cdst.as_ptr(), + std::ptr::null(), + MS_BIND | MS_REC, + std::ptr::null(), + ) + }; + if bind != 0 { + return Err(io::Error::last_os_error()); + } + // Best-effort read-only remount; leave the rw bind if it fails. + unsafe { + mount( + std::ptr::null(), + cdst.as_ptr(), + std::ptr::null(), + MS_BIND | MS_REMOUNT | MS_RDONLY | MS_REC, + std::ptr::null(), + ) + }; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Mutex, MutexGuard}; + + static ENV_LOCK: Mutex<()> = Mutex::new(()); + + struct PoolDirGuard { + _lock: MutexGuard<'static, ()>, + prior: Option, + } + + impl PoolDirGuard { + fn set(path: &Path) -> Self { + let lock = ENV_LOCK + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + let prior = std::env::var("NYX_BUILD_POOL_DIR").ok(); + unsafe { std::env::set_var("NYX_BUILD_POOL_DIR", path) }; + Self { _lock: lock, prior } + } + } + + impl Drop for PoolDirGuard { + fn drop(&mut self) { + match self.prior.take() { + Some(v) => unsafe { std::env::set_var("NYX_BUILD_POOL_DIR", v) }, + None => unsafe { std::env::remove_var("NYX_BUILD_POOL_DIR") }, + } + } + } + + #[test] + fn pinned_subdirs_cover_dependency_trees() { + assert_eq!(pinned_subdirs(Lang::JavaScript), &["node_modules"]); + assert_eq!(pinned_subdirs(Lang::Php), &["vendor"]); + assert_eq!(pinned_subdirs(Lang::Rust), &["target"]); + assert!(pinned_subdirs(Lang::C).is_empty()); + } + + #[test] + fn cold_baseline_is_not_warm() { + let tmp = tempfile::TempDir::new().unwrap(); + let _g = PoolDirGuard::set(tmp.path()); + let baseline = Baseline::ensure(Lang::JavaScript).expect("baseline root"); + assert!(!baseline.is_warm(), "empty baseline must be cold"); + } + + #[test] + fn warm_baseline_snapshots_into_workdir() { + let tmp = tempfile::TempDir::new().unwrap(); + let _g = PoolDirGuard::set(tmp.path()); + let baseline = Baseline::ensure(Lang::JavaScript).expect("baseline root"); + + // Warm the baseline: write a fake node_modules tree into the root. + let pkg = baseline.root().join("node_modules").join("left-pad"); + fs::create_dir_all(&pkg).unwrap(); + fs::write(pkg.join("index.js"), b"module.exports = 1;\n").unwrap(); + assert!(baseline.is_warm(), "populated baseline must report warm"); + + // Snapshot it into a fresh per-finding workdir. + let workdir = tempfile::TempDir::new().unwrap(); + baseline.snapshot_into(workdir.path()).unwrap(); + let cloned = workdir.path().join("node_modules").join("left-pad").join("index.js"); + assert!(cloned.exists(), "snapshot must materialise node_modules"); + assert_eq!(fs::read(&cloned).unwrap(), b"module.exports = 1;\n"); + } + + #[test] + fn snapshot_of_cold_baseline_is_noop() { + let tmp = tempfile::TempDir::new().unwrap(); + let _g = PoolDirGuard::set(tmp.path()); + let baseline = Baseline::ensure(Lang::Rust).expect("baseline root"); + let workdir = tempfile::TempDir::new().unwrap(); + // No pinned subdir present → snapshot succeeds and writes nothing. + baseline.snapshot_into(workdir.path()).unwrap(); + assert!(!workdir.path().join("target").exists()); + } +} diff --git a/src/dynamic/sandbox/mod.rs b/src/dynamic/sandbox/mod.rs index 800c5e95..eab5c39e 100644 --- a/src/dynamic/sandbox/mod.rs +++ b/src/dynamic/sandbox/mod.rs @@ -86,6 +86,15 @@ pub enum HardeningRecord { /// pin when one is available. pub mod docker; +/// Phase 24 (Track P.0) — prewarmed sandbox baseline directories. +/// +/// Holds one shared, warmed copy of each language toolchain's pinned +/// dependency tree (`node_modules`, `vendor`, `target/`, …) and CoW-snapshots +/// (macOS) or read-only bind-mounts (Linux) it into every per-finding workdir, +/// so per-workdir setup cost collapses from a full dependency install to a +/// near-free clone. +pub mod baseline; + // ── Harness interpretation probe ────────────────────────────────────────────── /// Returns true when the harness is driven by an interpreter (Python, Node, …) diff --git a/src/dynamic/stubs/broker.rs b/src/dynamic/stubs/broker.rs index eba0b783..09cd05cf 100644 --- a/src/dynamic/stubs/broker.rs +++ b/src/dynamic/stubs/broker.rs @@ -1539,18 +1539,32 @@ async fn handle_pubsub_grpc_connection( let Ok(mut connection) = h2::server::handshake(stream).await else { return; }; + // Each request runs as its own task so the accept loop keeps polling the + // h2 connection. The single connection future is the sole driver of socket + // I/O: a StreamingPull handler awaits client frames and flushes responses + // across many turns, so running it inline would starve the driver — the + // queued response could never flush and the client's `response.await` + // would park forever (the pre-fix deadhang). + let mut tasks = Vec::new(); while let Some(request) = connection.accept().await { let Ok((request, respond)) = request else { break; }; let path = request.uri().path().to_owned(); let body = request.into_body(); - if path.ends_with("/StreamingPull") { - handle_pubsub_streaming_pull(body, respond, Arc::clone(&state), &log_path).await; - } else { - let body = pubsub_grpc_read_all(body).await; - handle_pubsub_unary(&path, &body, respond, Arc::clone(&state), &log_path).await; - } + let state = Arc::clone(&state); + let log_path = log_path.clone(); + tasks.push(tokio::spawn(async move { + if path.ends_with("/StreamingPull") { + handle_pubsub_streaming_pull(body, respond, state, &log_path).await; + } else { + let body = pubsub_grpc_read_all(body).await; + handle_pubsub_unary(&path, &body, respond, state, &log_path).await; + } + })); + } + for task in tasks { + let _ = task.await; } } @@ -4307,7 +4321,10 @@ mod tests { .send_data(bytes::Bytes::from(pubsub_grpc_frame(&init_payload)), false) .unwrap(); - let response = response.await.unwrap(); + let response = tokio::time::timeout(Duration::from_secs(2), response) + .await + .expect("streaming pull response headers timed out") + .unwrap(); assert_eq!(response.status(), 200); let mut body = response.into_body(); let mut response_buffer = Vec::new(); diff --git a/src/dynamic/trace.rs b/src/dynamic/trace.rs index 94d4fe6d..3910750c 100644 --- a/src/dynamic/trace.rs +++ b/src/dynamic/trace.rs @@ -55,6 +55,11 @@ pub enum TraceStage { OracleWait, OracleObserved, Verdict, + /// Track P.0 — the verifier assigned this finding to a cap-routed + /// concurrency lane. `detail` carries `cap= lane=` so a + /// trace consumer can audit how a mixed-cap batch fanned out across + /// lanes without head-of-line blocking. + WorkerLaneAssigned, } impl TraceStage { @@ -73,6 +78,7 @@ impl TraceStage { Self::OracleWait => "oracle_wait", Self::OracleObserved => "oracle_observed", Self::Verdict => "verdict", + Self::WorkerLaneAssigned => "worker_lane_assigned", } } } @@ -236,5 +242,9 @@ mod tests { assert_eq!(TraceStage::OracleWait.as_str(), "oracle_wait"); assert_eq!(TraceStage::OracleObserved.as_str(), "oracle_observed"); assert_eq!(TraceStage::Verdict.as_str(), "verdict"); + assert_eq!( + TraceStage::WorkerLaneAssigned.as_str(), + "worker_lane_assigned" + ); } } diff --git a/tests/dynamic_workdir_clone.rs b/tests/dynamic_workdir_clone.rs new file mode 100644 index 00000000..23cc5ce6 --- /dev/null +++ b/tests/dynamic_workdir_clone.rs @@ -0,0 +1,87 @@ +//! Phase 24 / Track P.0 acceptance tests for cap-routed concurrency lanes. +//! +//! The headline gate: a 64-finding mixed-cap batch run through +//! [`WorkerPool::run_in_lanes`] beats a single-lane (one-queue) baseline by +//! ≥ 3×, because a slow `DESERIALIZE` harness can no longer head-of-line +//! block the fast `SSRF` ones — every cap drains its own lanes concurrently. +//! +//! The perf assertion is `#[ignore]` so the default suite stays hermetic and +//! fast; the ordering/correctness check runs by default. + +#![cfg(feature = "dynamic")] + +use std::time::{Duration, Instant}; + +use nyx_scanner::dynamic::runner::WorkerPool; +use nyx_scanner::labels::Cap; + +/// Realistic OWASP-scale mix: mostly parallelisable `SSRF`, a minority of slow +/// `DESERIALIZE`, and a few single-lane `CRYPTO`. +fn mixed_batch() -> Vec { + (0..64) + .map(|i| match i % 8 { + 0 => Cap::DESERIALIZE, + 1 => Cap::CRYPTO, + _ => Cap::SSRF, + }) + .collect() +} + +/// Simulated per-finding verify cost: `DESERIALIZE` is the slow JVM/gadget +/// harness; everything else is cheap. +fn simulated_cost(cap: Cap) -> Duration { + if cap.contains(Cap::DESERIALIZE) { + Duration::from_millis(24) + } else { + Duration::from_millis(4) + } +} + +#[test] +fn run_in_lanes_preserves_order_and_runs_all() { + let batch = mixed_batch(); + let out = WorkerPool::run_in_lanes(&batch, None, |c| *c, |i, _| i * 2); + assert_eq!(out.len(), batch.len()); + // Output indexed by input position regardless of lane scheduling. + assert_eq!(out, (0..batch.len()).map(|i| i * 2).collect::>()); +} + +#[test] +#[ignore = "Phase 24 perf bench: 64-finding mixed-cap batch ≥ 3× vs single-lane. Opt-in so the default suite stays hermetic + fast. Run: cargo nextest run --features dynamic --run-ignored ignored-only -E 'binary(~workdir_clone)'"] +fn cap_lanes_beat_single_lane_by_3x() { + let batch = mixed_batch(); + + // Single-lane baseline: one queue, strictly sequential — the pre-P.0 + // behaviour where a slow cap blocks the whole batch. + let t0 = Instant::now(); + let mut baseline_out = Vec::with_capacity(batch.len()); + for (i, c) in batch.iter().enumerate() { + std::thread::sleep(simulated_cost(*c)); + baseline_out.push(i); + } + let single_lane = t0.elapsed(); + + // Cap-routed lanes: every cap runs concurrently with its own worker budget. + let t1 = Instant::now(); + let lane_out = WorkerPool::run_in_lanes( + &batch, + None, + |c| *c, + |i, c| { + std::thread::sleep(simulated_cost(*c)); + i + }, + ); + let lanes = t1.elapsed(); + + assert_eq!(lane_out, baseline_out, "lanes must produce identical ordered results"); + + let speedup = single_lane.as_secs_f64() / lanes.as_secs_f64(); + eprintln!( + "phase24 cap-lanes: single-lane {single_lane:.2?}, cap-lanes {lanes:.2?}, speedup {speedup:.2}×" + ); + assert!( + lanes.as_secs_f64() * 3.0 <= single_lane.as_secs_f64(), + "phase24 acceptance gate: expected ≥ 3× speedup, got {speedup:.2}× (single={single_lane:?}, lanes={lanes:?})", + ); +}