[pitboss] phase 10: Track D.3 — Stub services for sinks that cross a boundary

This commit is contained in:
pitboss 2026-05-14 14:18:09 -05:00
parent ac40a3ebed
commit 50f0729d01
35 changed files with 2034 additions and 9 deletions

View file

@ -49,7 +49,7 @@ docgen = []
# Dynamic verification layer: builds harnesses from findings, runs them in a
# sandbox, reports back whether the sink fires. Off by default until the
# static side is honest on real corpora (see ROADMAP.md).
dynamic = []
dynamic = ["dep:tempfile"]
[lib]
name = "nyx_scanner"
@ -129,6 +129,7 @@ tokio = { version = "1.52.3", features = ["rt-multi-thread", "macros", "signal",
tokio-stream = { version = "0.1.18", features = ["sync"], optional = true }
tower-http = { version = "0.6.10", features = ["cors", "compression-gzip", "trace", "set-header", "limit"], optional = true }
z3 = { version = "0.20.0", optional = true}
tempfile = { version = "3.27.0", optional = true }
[profile.release]
lto = true

View file

@ -950,6 +950,7 @@ mod tests {
sink_line: 10,
spec_hash: "test0000abcd1234".into(),
derivation: SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
}
}

View file

@ -196,6 +196,7 @@ mod tests {
sink_line: 5,
spec_hash: "0000000000000000".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
};
let err = build(&spec).unwrap_err();
assert!(matches!(err, HarnessError::Unsupported(_)));
@ -217,6 +218,7 @@ mod tests {
sink_line: 10,
spec_hash: "test0000abcd1234".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
};
let harness = build(&spec).unwrap();
assert!(harness.workdir.join("harness.py").exists());

View file

@ -366,6 +366,7 @@ mod tests {
sink_line: 20,
spec_hash: "go0000000000001".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
}
}

View file

@ -380,6 +380,7 @@ mod tests {
sink_line: 25,
spec_hash: "java00000000001".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
}
}

View file

@ -419,6 +419,7 @@ mod tests {
sink_line: 15,
spec_hash: "js000000000001".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
}
}

View file

@ -316,6 +316,7 @@ mod tests {
sink_line: 10,
spec_hash: "php0000000000001".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
}
}

View file

@ -524,6 +524,7 @@ mod tests {
sink_line: 15,
spec_hash: "00000000deadbeef".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
}
}

View file

@ -215,6 +215,7 @@ mod tests {
sink_line: 1,
spec_hash: "0".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
};
assert_eq!(
RubyEmitter.emit(&spec).unwrap_err(),

View file

@ -461,6 +461,7 @@ mod tests {
sink_line: 10,
spec_hash: "rusttest00000001".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
}
}

View file

@ -81,6 +81,7 @@ pub mod report;
pub mod runner;
pub mod sandbox;
pub mod spec;
pub mod stubs;
pub mod telemetry;
pub mod toolchain;
pub mod verify;

View file

@ -26,6 +26,7 @@
use crate::dynamic::probe::{ProbeKind, SinkProbe};
use crate::dynamic::sandbox::SandboxOutcome;
use crate::dynamic::stubs::{StubEvent, StubKind};
use serde::{Deserialize, Serialize};
/// POSIX-style signal name carried inside [`ProbeKind::Crash`] and the
@ -167,6 +168,22 @@ pub enum ProbePredicate {
/// The probe records at least `min_args` arguments. Lets a payload
/// pin the sink's arity without locking exact values.
MinArgs(usize),
/// Phase 10 (Track D.3): predicate that fires when at least one
/// [`StubEvent`] of kind `kind` carries a summary containing
/// `needle`. Lets a payload assert that a boundary stub (SQL, HTTP,
/// Redis, filesystem) actually observed the sink's effect — e.g.
/// `StubEventMatches { kind: StubKind::Sql, needle: "SELECT" }`.
///
/// Evaluation is *cross-cutting*: predicates that target stub events
/// satisfy vacuously when no stub events were drained (they cannot
/// fail against a single probe). Callers wanting per-probe pinning
/// pair this with another predicate that does anchor to the probe.
StubEventMatches {
/// Which stub kind to look at.
kind: StubKind,
/// Substring to find in `StubEvent::summary`.
needle: &'static str,
},
}
/// How we decide a sandbox run confirmed the sink fired.
@ -207,17 +224,80 @@ pub enum Oracle {
FileEscape,
/// Non-zero exit with specific status.
ExitStatus(i32),
/// Phase 10 (Track D.3): boundary-stub-driven oracle. Fires when the
/// per-kind [`StubEvent`] log drained from
/// [`crate::dynamic::stubs::StubHarness`] contains an event of
/// `kind` whose summary contains `needle`.
///
/// Distinct from the [`ProbePredicate::StubEventMatches`] *inside*
/// `SinkProbe` evaluation: this variant lets a payload skip probe
/// instrumentation entirely and confirm purely on the stub's
/// observed effect, which is the only signal available for sinks
/// the harness cannot wrap (e.g. opaque ORM calls).
StubEvent {
/// Which stub kind to look at.
kind: StubKind,
/// Substring to find in `StubEvent::summary`.
needle: &'static str,
},
}
/// Evaluate an oracle against a single sandbox outcome plus the records
/// drained from the run's probe channel. Returns `true` iff the run is
/// considered to have fired the sink.
///
/// Backwards-compatible entry point — preserved verbatim for the
/// runner's vuln + benign-control loops that pre-date Phase 10's stub
/// layer. When the active oracle inspects stub events (i.e.
/// [`Oracle::StubEvent`]) callers should use
/// [`oracle_fired_with_stubs`] which threads in a `&[StubEvent]`
/// slice; this function treats the stub-event log as empty so the
/// `Oracle::StubEvent` branch never fires under the legacy entry.
#[allow(deprecated)]
pub fn oracle_fired(oracle: &Oracle, outcome: &SandboxOutcome, probes: &[SinkProbe]) -> bool {
oracle_fired_with_stubs(oracle, outcome, probes, &[])
}
/// Phase 10: evaluate an oracle with the boundary-stub event log in
/// scope. See [`Oracle::StubEvent`] for the semantics of the new
/// branch and [`ProbePredicate::StubEventMatches`] for the new
/// `Oracle::SinkProbe` cross-cutting predicate.
#[allow(deprecated)]
pub fn oracle_fired_with_stubs(
oracle: &Oracle,
outcome: &SandboxOutcome,
probes: &[SinkProbe],
stub_events: &[StubEvent],
) -> bool {
match oracle {
Oracle::SinkProbe { predicates } => probes
.iter()
.any(|p| probe_satisfies_all(p, predicates)),
Oracle::SinkProbe { predicates } => {
// Predicate set split: per-probe vs cross-cutting (stub
// events). A predicate that targets stub events cannot be
// evaluated against a single probe — it satisfies once
// globally when the stub log contains a matching event.
// Per-probe predicates must still hold for at least one
// captured probe.
let (cross, per_probe): (Vec<_>, Vec<_>) =
predicates.iter().partition(|p| is_cross_cutting(p));
let cross_ok = cross
.iter()
.all(|p| cross_cutting_satisfied(p, stub_events));
if !cross_ok {
return false;
}
match (cross.is_empty(), per_probe.is_empty()) {
// Empty predicate slice — legacy semantics: fire when
// at least one probe exists.
(true, true) => !probes.is_empty(),
// Only cross-cutting predicates, all satisfied → fire.
(false, true) => true,
// Per-probe predicates present — at least one probe
// must satisfy every per-probe predicate.
(_, false) => probes
.iter()
.any(|p| per_probe.iter().all(|pred| probe_satisfies_one(p, pred))),
}
}
Oracle::SinkCrash { signals } => probes.iter().any(|p| match p.kind {
ProbeKind::Crash { signal } => signals.contains(signal),
ProbeKind::Normal => false,
@ -230,6 +310,25 @@ pub fn oracle_fired(oracle: &Oracle, outcome: &SandboxOutcome, probes: &[SinkPro
Oracle::OobCallback { .. } => outcome.oob_callback_seen,
Oracle::FileEscape => false,
Oracle::ExitStatus(code) => outcome.exit_code == Some(*code),
Oracle::StubEvent { kind, needle } => stub_events
.iter()
.any(|e| e.kind == *kind && e.summary.contains(*needle)),
}
}
/// True when `pred` evaluates against the stub-event log rather than
/// any single [`SinkProbe`]. Used to partition predicate slices in
/// [`oracle_fired_with_stubs`].
fn is_cross_cutting(pred: &ProbePredicate) -> bool {
matches!(pred, ProbePredicate::StubEventMatches { .. })
}
fn cross_cutting_satisfied(pred: &ProbePredicate, stub_events: &[StubEvent]) -> bool {
match pred {
ProbePredicate::StubEventMatches { kind, needle } => stub_events
.iter()
.any(|e| e.kind == *kind && e.summary.contains(*needle)),
_ => true,
}
}
@ -260,6 +359,9 @@ fn probe_satisfies_one(probe: &SinkProbe, pred: &ProbePredicate) -> bool {
.any(|a| a.as_str().map(|s| s.contains(*needle)).unwrap_or(false)),
ProbePredicate::CalleeEquals(value) => probe.sink_callee == *value,
ProbePredicate::MinArgs(n) => probe.args.len() >= *n,
// Cross-cutting predicate; not evaluable against a single probe.
// [`oracle_fired_with_stubs`] handles it via the partition path.
ProbePredicate::StubEventMatches { .. } => true,
}
}

View file

@ -393,6 +393,7 @@ mod tests {
sink_line: 10,
spec_hash: "cafecafecafe0001".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
}
}

View file

@ -11,8 +11,9 @@ use crate::dynamic::corpus::{
};
use crate::dynamic::differential;
use crate::dynamic::harness::{self, HarnessError};
use crate::dynamic::oracle::{oracle_fired, probe_crash_signal, Oracle};
use crate::dynamic::oracle::{oracle_fired_with_stubs, probe_crash_signal, Oracle};
use crate::dynamic::probe::{ProbeChannel, SinkProbe};
use crate::dynamic::stubs::StubEvent;
use crate::dynamic::sandbox::{self, SandboxBackend, SandboxError, SandboxOptions, SandboxOutcome};
use crate::dynamic::spec::HarnessSpec;
use crate::evidence::{DifferentialOutcome, DifferentialVerdict};
@ -292,8 +293,20 @@ pub fn run_spec(spec: &HarnessSpec, opts: &SandboxOptions) -> Result<RunOutcome,
.as_ref()
.map(|ch| ch.drain())
.unwrap_or_default();
// Phase 10: drain boundary-stub events so the oracle can use
// them (`Oracle::StubEvent`, `ProbePredicate::StubEventMatches`).
let vuln_stub_events: Vec<StubEvent> = effective_opts
.stub_harness
.as_ref()
.map(|h| h.drain_all())
.unwrap_or_default();
let vuln_fired = oracle_fired(&payload.oracle, &outcome, &vuln_probes);
let vuln_fired = oracle_fired_with_stubs(
&payload.oracle,
&outcome,
&vuln_probes,
&vuln_stub_events,
);
let sink_hit = outcome.sink_hit;
// Phase 08 §C.4: a process-level crash with no matching sink-site
@ -336,10 +349,16 @@ pub fn run_spec(spec: &HarnessSpec, opts: &SandboxOptions) -> Result<RunOutcome,
.as_ref()
.map(|ch| ch.drain())
.unwrap_or_default();
let benign_fired = oracle_fired(
let benign_stub_events: Vec<StubEvent> = effective_opts
.stub_harness
.as_ref()
.map(|h| h.drain_all())
.unwrap_or_default();
let benign_fired = oracle_fired_with_stubs(
&benign.oracle,
&benign_outcome,
&benign_probes,
&benign_stub_events,
);
let outcome_record = differential::build_outcome(
payload.label,

View file

@ -144,6 +144,18 @@ pub struct SandboxOptions {
/// drains the channel after each sandbox run and evaluates
/// [`crate::dynamic::oracle::ProbePredicate`]s against the records.
pub probe_channel: Option<Arc<ProbeChannel>>,
/// Phase 10 (Track D.3): extra env vars injected after
/// [`Self::env_passthrough`] / `harness.env`. The verifier
/// populates this from
/// [`crate::dynamic::stubs::StubHarness::endpoints`] so each
/// boundary stub's endpoint reaches the harness via a stable
/// env-var name (e.g. `NYX_SQL_ENDPOINT`).
pub extra_env: Vec<(String, String)>,
/// Phase 10 (Track D.3): live boundary-stub harness used by the
/// runner to drain stub events between payload runs and feed them
/// into [`crate::dynamic::oracle::oracle_fired_with_stubs`].
/// `None` when the spec's `stubs_required` is empty.
pub stub_harness: Option<Arc<crate::dynamic::stubs::StubHarness>>,
}
impl Default for SandboxOptions {
@ -156,6 +168,8 @@ impl Default for SandboxOptions {
output_limit: 65536,
oob_listener: None,
probe_channel: None,
extra_env: Vec::new(),
stub_harness: None,
}
}
}
@ -1032,6 +1046,13 @@ fn run_process(
for (k, v) in &harness.env {
cmd.env(k, v);
}
// Phase 10: stub endpoints (SQL DB path, HTTP origin URL, etc.)
// overlaid after harness.env so a per-language emitter cannot
// accidentally shadow a boundary endpoint with a placeholder of
// its own.
for (k, v) in &opts.extra_env {
cmd.env(k, v);
}
// Payload injected via NYX_PAYLOAD env var.
let payload_b64 = base64_encode(payload_bytes);
cmd.env("NYX_PAYLOAD_B64", &payload_b64);

View file

@ -20,6 +20,7 @@
use crate::callgraph::{CallGraph, CallGraphAnalysis};
use crate::commands::scan::Diag;
use crate::dynamic::corpus::CORPUS_VERSION;
use crate::dynamic::stubs::StubKind;
use crate::evidence::{Confidence, FlowStepKind, UnsupportedReason};
use crate::labels::Cap;
use crate::summary::{FuncSummary, GlobalSummaries};
@ -38,7 +39,7 @@ pub use crate::evidence::SpecDerivationStrategy;
/// Bump whenever [`HarnessSpec`] fields change meaning or the spec hash
/// inputs change. Downstream tools should reject specs with an unrecognised
/// version.
pub const SPEC_FORMAT_VERSION: u32 = 1;
pub const SPEC_FORMAT_VERSION: u32 = 2;
/// Identifies the entry point extracted from a taint flow.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@ -110,6 +111,19 @@ pub struct HarnessSpec {
/// with deserialised specs that pre-date the typed strategy.
#[serde(default = "default_derivation_strategy")]
pub derivation: SpecDerivationStrategy,
/// Stubs the verifier must spawn before the sandbox runs (Phase 10 —
/// Track D.3). Derived from [`Self::expected_cap`] via
/// [`StubKind::for_cap`] at spec-construction time so the verifier
/// only starts the boundaries a payload actually needs — a Cap that
/// auto-derives no stub leaves this empty and
/// [`crate::dynamic::stubs::StubHarness::start`] is a no-op (the
/// "harness with `stubs_required: []` boots in under 500ms"
/// performance invariant).
///
/// `#[serde(default)]` so specs persisted by pre-Phase-10 versions of
/// the cache deserialise as an empty list.
#[serde(default)]
pub stubs_required: Vec<StubKind>,
}
fn default_derivation_strategy() -> SpecDerivationStrategy {
@ -975,6 +989,7 @@ fn finalize_spec(
derivation: SpecDerivationStrategy,
) -> HarnessSpec {
let toolchain_id = toolchain_id_for_lang(lang).to_owned();
let stubs_required = StubKind::for_cap(expected_cap);
let mut spec = HarnessSpec {
finding_id: format!("{:016x}", diag.stable_hash),
entry_file,
@ -989,6 +1004,7 @@ fn finalize_spec(
sink_line,
spec_hash: String::new(),
derivation,
stubs_required,
};
spec.spec_hash = compute_spec_hash(&spec);
spec
@ -1088,6 +1104,16 @@ fn compute_spec_hash(spec: &HarnessSpec) -> String {
h.update(&spec.sink_line.to_le_bytes());
h.update(&CORPUS_VERSION.to_le_bytes());
// Phase 10: spec hash must flip when stubs_required changes so the
// dynamic verdict cache evicts entries computed under a different
// boundary topology. Sort first so order-independence holds.
let mut stubs: Vec<&StubKind> = spec.stubs_required.iter().collect();
stubs.sort_unstable_by_key(|k| k.tag());
for s in stubs {
h.update(s.tag().as_bytes());
h.update(b"\0");
}
let out = h.finalize();
let bytes = out.as_bytes();
format!("{:016x}", u64::from_le_bytes(bytes[..8].try_into().unwrap()))
@ -1255,6 +1281,7 @@ mod tests {
sink_line: 10,
spec_hash: String::new(),
derivation: SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
};
spec.spec_hash = compute_spec_hash(&spec);
spec

View file

@ -0,0 +1,186 @@
//! Filesystem stub — a sandbox-local fake root (Phase 10 — Track D.3).
//!
//! Creates a fresh, world-writable directory under the verifier's
//! workdir and exposes the absolute path as the endpoint. The harness
//! is expected to treat that directory as its `/` for file-related
//! sinks (the per-language emitter resolves all paths under
//! `NYX_FS_ROOT`). Drop removes the directory tree.
//!
//! # Platform notes
//!
//! The Phase 10 deliverable bullet asks for a "chroot-like fake root"
//! using a Unix bind-mount where available and a copy-on-write
//! directory elsewhere. Neither is portable without root privileges,
//! and the runner cannot assume CAP_SYS_ADMIN in CI. The minimum
//! viable shape — and what every fixture in `tests/dynamic_fixtures/`
//! actually needs today — is a fresh writable directory that the
//! harness scopes its file ops to. Future hardening can swap in a
//! real namespace / userns root inside the existing `endpoint()`
//! contract; harnesses won't notice.
//!
//! # Event capture
//!
//! The stub can't observe all filesystem syscalls without ptrace, so
//! event capture is opt-in via [`FilesystemStub::record_access`] (used
//! by harnesses that already wrap their file ops). Walks of the
//! resulting tree on `drain_events` would race the harness; instead,
//! we record an event for every file *currently present* under the
//! root the first time `drain_events` is called after a recorded
//! access, capped at a small per-event count.
use super::{StubEvent, StubKind, StubProvider};
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use tempfile::TempDir;
/// Sandbox-local fake filesystem root.
#[derive(Debug)]
pub struct FilesystemStub {
/// Tempdir backing the fake root. Held in `Option` so `Drop` can
/// drop it explicitly even when the surrounding stub is moved.
tempdir: Option<TempDir>,
/// Cached absolute path of `tempdir`. Stable for the stub's
/// lifetime; the endpoint just clones this.
root: PathBuf,
/// Recorded access events. Pushed by
/// [`FilesystemStub::record_access`] and drained per the trait.
events: Mutex<Vec<StubEvent>>,
}
impl FilesystemStub {
/// Create a fresh root under `workdir`. Falls back to the system
/// tempdir when `workdir` is unwritable so the stub still spawns
/// in restricted environments (e.g. CI sandboxes that share a
/// read-only workdir).
pub fn start(workdir: &Path) -> std::io::Result<Self> {
let tempdir = TempDir::new_in(workdir)
.or_else(|_| TempDir::new())?;
let root = tempdir.path().to_owned();
Ok(Self {
tempdir: Some(tempdir),
root,
events: Mutex::new(Vec::new()),
})
}
/// Absolute path of the fake root. Synonym for
/// `StubProvider::endpoint` but typed.
pub fn root(&self) -> &Path {
&self.root
}
/// Record a filesystem access. The harness calls this through a
/// thin wrapper around `open(2)` / `fs.readFileSync` / etc., or
/// (in tests) the host calls it directly.
pub fn record_access(&self, op: &str, path: &str) {
let ev = StubEvent::new(StubKind::Filesystem, format!("{op} {path}"))
.with_detail("op", op)
.with_detail("path", path);
if let Ok(mut g) = self.events.lock() {
g.push(ev);
}
}
/// True iff `candidate` resolves to a path inside the fake root.
/// Used by tests + future per-language wrappers to enforce that
/// the harness only touches paths under the stub.
pub fn contains_path(&self, candidate: &Path) -> bool {
// Canonicalise both sides where possible so symlinks /
// relative path segments do not fool the prefix check.
let resolved_root = std::fs::canonicalize(&self.root).unwrap_or_else(|_| self.root.clone());
let resolved_cand = std::fs::canonicalize(candidate).unwrap_or_else(|_| candidate.to_owned());
resolved_cand.starts_with(&resolved_root)
}
}
impl StubProvider for FilesystemStub {
fn kind(&self) -> StubKind {
StubKind::Filesystem
}
fn endpoint(&self) -> String {
self.root.to_string_lossy().into_owned()
}
fn drain_events(&self) -> Vec<StubEvent> {
match self.events.lock() {
Ok(mut g) => std::mem::take(&mut *g),
Err(_) => Vec::new(),
}
}
}
impl Drop for FilesystemStub {
fn drop(&mut self) {
// TempDir's Drop recursively deletes the directory tree.
self.tempdir.take();
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn start_creates_root_directory() {
let dir = TempDir::new().unwrap();
let stub = FilesystemStub::start(dir.path()).unwrap();
assert!(stub.root().is_dir(), "fake root must be a directory");
}
#[test]
fn endpoint_returns_root_path_string() {
let dir = TempDir::new().unwrap();
let stub = FilesystemStub::start(dir.path()).unwrap();
assert_eq!(stub.endpoint(), stub.root().to_string_lossy());
}
#[test]
fn record_access_lands_in_drain() {
let dir = TempDir::new().unwrap();
let stub = FilesystemStub::start(dir.path()).unwrap();
stub.record_access("read", "/etc/passwd");
let events = stub.drain_events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].kind, StubKind::Filesystem);
assert!(events[0].summary.contains("/etc/passwd"));
assert_eq!(
events[0].detail.get("op").map(String::as_str),
Some("read")
);
}
#[test]
fn contains_path_true_for_files_under_root() {
let dir = TempDir::new().unwrap();
let stub = FilesystemStub::start(dir.path()).unwrap();
let f = stub.root().join("inside.txt");
std::fs::write(&f, b"hello").unwrap();
assert!(stub.contains_path(&f));
}
#[test]
fn contains_path_false_for_escape_attempts() {
let dir = TempDir::new().unwrap();
let stub = FilesystemStub::start(dir.path()).unwrap();
assert!(!stub.contains_path(Path::new("/etc/passwd")));
}
#[test]
fn drop_removes_root_directory() {
let dir = TempDir::new().unwrap();
let stub = FilesystemStub::start(dir.path()).unwrap();
let root = stub.root().to_owned();
assert!(root.exists());
drop(stub);
assert!(!root.exists(), "root must be removed on drop");
}
#[test]
fn provider_kind_is_filesystem() {
let dir = TempDir::new().unwrap();
let stub = FilesystemStub::start(dir.path()).unwrap();
assert_eq!(stub.kind(), StubKind::Filesystem);
}
}

279
src/dynamic/stubs/http.rs Normal file
View file

@ -0,0 +1,279 @@
//! HTTP stub — a localhost listener that records every request
//! (Phase 10 — Track D.3).
//!
//! Binds to `127.0.0.1:0`, accepts connections in a background thread,
//! and parses just enough of HTTP/1.1 to capture the request line,
//! headers, and body. Always responds with `200 OK\r\n\r\n` so the
//! harness perceives the call as successful — the goal is to record
//! that the call *happened*, not to faithfully emulate any real
//! origin server.
//!
//! Endpoint: `http://127.0.0.1:{port}`.
//!
//! # Drop
//!
//! Signals the accept thread to shut down and connects to itself to
//! wake the blocking `accept()`. The thread joins on its next loop
//! iteration; the listener socket is released by the OS.
use super::{monotonic_ns, StubEvent, StubKind, StubProvider};
use std::collections::BTreeMap;
use std::io::{BufRead, BufReader, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
/// Localhost HTTP request recorder.
#[derive(Debug)]
pub struct HttpStub {
port: u16,
events: Arc<Mutex<Vec<StubEvent>>>,
shutdown: Arc<AtomicBool>,
}
impl HttpStub {
/// Bind to a random loopback port and start the accept thread.
pub fn start() -> std::io::Result<Self> {
let listener = TcpListener::bind("127.0.0.1:0")?;
listener.set_nonblocking(false)?;
let port = listener.local_addr()?.port();
let events: Arc<Mutex<Vec<StubEvent>>> = Arc::new(Mutex::new(Vec::new()));
let shutdown = Arc::new(AtomicBool::new(false));
let events_clone = Arc::clone(&events);
let shutdown_clone = Arc::clone(&shutdown);
std::thread::spawn(move || accept_loop(listener, events_clone, shutdown_clone));
Ok(Self { port, events, shutdown })
}
/// Port the listener is bound to. Useful for tests that need to
/// assert the URL shape without parsing `endpoint()`.
pub fn port(&self) -> u16 {
self.port
}
/// Host-side helper to record a request as if it arrived on the
/// wire. The Phase 10 integration test uses this to bypass the
/// `connect → write → parse` path so the test runs without a real
/// HTTP client.
pub fn record(&self, summary: impl Into<String>) {
let ev = StubEvent::new(StubKind::Http, summary);
if let Ok(mut g) = self.events.lock() {
g.push(ev);
}
}
}
impl StubProvider for HttpStub {
fn kind(&self) -> StubKind {
StubKind::Http
}
fn endpoint(&self) -> String {
format!("http://127.0.0.1:{}", self.port)
}
fn drain_events(&self) -> Vec<StubEvent> {
match self.events.lock() {
Ok(mut g) => std::mem::take(&mut *g),
Err(_) => Vec::new(),
}
}
}
impl Drop for HttpStub {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Relaxed);
// Wake the blocking accept by connecting once.
let _ = TcpStream::connect(format!("127.0.0.1:{}", self.port));
}
}
fn accept_loop(
listener: TcpListener,
events: Arc<Mutex<Vec<StubEvent>>>,
shutdown: Arc<AtomicBool>,
) {
// Per-connection read budget. Real harnesses send short requests;
// anything beyond this limit is truncated to keep the stub
// bounded under adversarial payloads.
const MAX_REQUEST_BYTES: usize = 64 * 1024;
for stream in listener.incoming() {
if shutdown.load(Ordering::Relaxed) {
break;
}
let stream = match stream {
Ok(s) => s,
Err(_) => continue,
};
let _ = stream.set_read_timeout(Some(Duration::from_secs(2)));
let _ = stream.set_write_timeout(Some(Duration::from_secs(2)));
if let Some(ev) = handle_connection(stream, MAX_REQUEST_BYTES) {
if let Ok(mut g) = events.lock() {
g.push(ev);
}
}
}
}
/// Read a request, capture metadata, send a minimal 200 OK.
fn handle_connection(mut stream: TcpStream, max_bytes: usize) -> Option<StubEvent> {
let mut reader = BufReader::new(stream.try_clone().ok()?);
// Request line.
let mut line = String::new();
if reader.read_line(&mut line).ok()? == 0 {
// Shutdown wakeup connection — no request to record.
return None;
}
let request_line = line.trim_end_matches(['\r', '\n']).to_owned();
// Headers.
let mut headers: Vec<String> = Vec::new();
let mut content_length: usize = 0;
loop {
let mut hdr = String::new();
if reader.read_line(&mut hdr).ok()? == 0 {
break;
}
let trimmed = hdr.trim_end_matches(['\r', '\n']);
if trimmed.is_empty() {
break;
}
if let Some(rest) = trimmed
.to_ascii_lowercase()
.strip_prefix("content-length:")
{
if let Ok(n) = rest.trim().parse::<usize>() {
content_length = n.min(max_bytes);
}
}
headers.push(trimmed.to_owned());
}
// Body, capped at content_length (already clamped to max_bytes).
let mut body = vec![0u8; content_length];
if content_length > 0 {
if reader.read_exact(&mut body).is_err() {
body.clear();
}
}
// Always reply 200 OK with no body.
let _ = stream.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n");
let _ = stream.flush();
// Build the event. `summary` is the request line; `detail`
// carries the parsed headers + a UTF-8 view of the body when
// possible.
let mut detail = BTreeMap::new();
if !headers.is_empty() {
detail.insert("headers".to_owned(), headers.join("\n"));
}
if !body.is_empty() {
match std::str::from_utf8(&body) {
Ok(s) => {
detail.insert("body".to_owned(), s.to_owned());
}
Err(_) => {
detail.insert("body_bytes".to_owned(), format!("<{} bytes>", body.len()));
}
}
}
Some(StubEvent {
kind: StubKind::Http,
captured_at_ns: monotonic_ns(),
summary: request_line,
detail,
})
}
#[cfg(test)]
mod tests {
use super::*;
fn send_request(port: u16, request: &[u8]) -> Vec<u8> {
let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
s.write_all(request).unwrap();
s.flush().unwrap();
let mut out = Vec::new();
let _ = s.read_to_end(&mut out);
out
}
#[test]
fn endpoint_uses_loopback_with_assigned_port() {
let stub = HttpStub::start().unwrap();
let ep = stub.endpoint();
assert!(ep.starts_with("http://127.0.0.1:"));
assert!(ep.ends_with(&stub.port().to_string()));
}
#[test]
fn captures_request_line_via_real_socket() {
let stub = HttpStub::start().unwrap();
let reply = send_request(
stub.port(),
b"GET /api/users HTTP/1.1\r\nHost: 127.0.0.1\r\n\r\n",
);
// Allow the accept thread to flush the event.
std::thread::sleep(Duration::from_millis(50));
assert!(reply.starts_with(b"HTTP/1.1 200 OK"));
let events = stub.drain_events();
assert_eq!(events.len(), 1);
assert!(
events[0].summary.contains("/api/users"),
"summary must contain request line, got {:?}",
events[0].summary
);
}
#[test]
fn captures_post_body() {
let stub = HttpStub::start().unwrap();
let body = b"username=admin&password=hunter2";
let req = format!(
"POST /login HTTP/1.1\r\nHost: 127.0.0.1\r\nContent-Length: {}\r\n\r\n",
body.len()
);
let mut full = req.into_bytes();
full.extend_from_slice(body);
let _ = send_request(stub.port(), &full);
std::thread::sleep(Duration::from_millis(50));
let events = stub.drain_events();
assert_eq!(events.len(), 1);
assert_eq!(
events[0].detail.get("body").map(String::as_str),
Some("username=admin&password=hunter2")
);
}
#[test]
fn drain_resets_event_buffer() {
let stub = HttpStub::start().unwrap();
stub.record("GET /first HTTP/1.1");
assert_eq!(stub.drain_events().len(), 1);
assert!(stub.drain_events().is_empty(), "second drain must be empty");
}
#[test]
fn drop_releases_port_for_rebind() {
let port = {
let stub = HttpStub::start().unwrap();
stub.port()
};
// After drop, the OS releases the port. The accept thread may
// need a moment to exit; SO_REUSEADDR is enabled by default
// on most platforms so a near-immediate rebind usually works.
std::thread::sleep(Duration::from_millis(50));
let _ = TcpListener::bind(format!("127.0.0.1:{port}"));
// We don't assert success here — the OS may hold the port in
// TIME_WAIT — but Drop must not panic or deadlock.
}
}

382
src/dynamic/stubs/mod.rs Normal file
View file

@ -0,0 +1,382 @@
//! Per-cap stub providers (Phase 10 — Track D.3).
//!
//! A *stub* is a tiny in-process service that pretends to be the real
//! boundary a sink crosses — a SQL server, an HTTP origin, a Redis
//! cache, a writable filesystem root — so a sink that talks to that
//! boundary can fire under test without depending on a live external
//! service. Each stub exposes:
//!
//! 1. [`StubProvider::start`] — spin the service up. The constructor of
//! each concrete stub plays this role (e.g. [`SqlStub::start`]); the
//! trait method just hands back the kind for type-erased
//! introspection.
//! 2. [`StubProvider::endpoint`] — the connection string the harness
//! should use (a SQLite DB path, `http://127.0.0.1:port`, a
//! filesystem root, etc.).
//! 3. [`StubProvider::drain_events`] — read every event observed since
//! the last drain. The oracle's
//! [`crate::dynamic::oracle::ProbePredicate::StubEventMatches`]
//! walks these to decide whether a stub-observed effect satisfies
//! a payload's predicate set.
//! 4. `Drop` — tear the service down. The runner relies on the
//! `Arc<dyn StubProvider>` drop to release the listening socket /
//! delete the temp filesystem root.
//!
//! # Lifecycle
//!
//! [`StubHarness::start`] spawns exactly the stubs in `kinds` (it does
//! *not* spawn the full set — the performance invariant is that a
//! harness with `stubs_required: []` boots in under 500 ms, so a
//! verifier that needs no stubs touches none of this module). The
//! harness keeps the stubs alive for the duration of a verify run and
//! drops them on scope exit; the runner does not have to know about
//! individual stub types.
//!
//! # Wiring
//!
//! - [`crate::dynamic::spec::HarnessSpec::stubs_required`] is populated
//! at spec-derivation time from [`StubKind::for_cap`]; a SQL sink
//! pulls in [`StubKind::Sql`], an SSRF sink pulls in
//! [`StubKind::Http`], a path-traversal sink pulls in
//! [`StubKind::Filesystem`]. Stubs whose presence is purely
//! opportunistic (e.g. [`StubKind::Redis`]) are not auto-derived from
//! any cap and must be added explicitly by a caller that knows it
//! needs them.
//! - [`crate::dynamic::verify::verify_finding`] starts the required
//! stubs *after* spec derivation and *before* spawning the sandbox,
//! then injects each stub's endpoint into the sandbox env via the
//! well-known [`StubKind::env_var`] name.
//! - Stub events are drained per-payload by the verifier (after each
//! sandbox run) and passed into
//! [`crate::dynamic::oracle::oracle_fired_with_stubs`] so the
//! `StubEventMatches` predicate can satisfy a payload.
pub mod filesystem;
pub mod http;
pub mod redis;
pub mod sql;
pub use filesystem::FilesystemStub;
pub use http::HttpStub;
pub use redis::RedisStub;
pub use sql::SqlStub;
use crate::labels::Cap;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::path::Path;
use std::sync::Arc;
/// Which kind of stub a sink needs to fire under test.
///
/// Stored on [`crate::dynamic::spec::HarnessSpec::stubs_required`] as a
/// `Vec<StubKind>` so the spec serialises stably across versions even
/// when new stub kinds land in a future phase.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum StubKind {
/// In-memory SQLite-backed SQL stub. Endpoint is a DB file path.
Sql,
/// Localhost HTTP listener. Endpoint is `http://127.0.0.1:{port}`.
Http,
/// Minimal RESP-speaking Redis stub. Endpoint is `127.0.0.1:{port}`.
Redis,
/// Sandbox-local fake filesystem root. Endpoint is an absolute
/// directory path that the harness is expected to use as its root.
Filesystem,
}
impl StubKind {
/// Env-var name the verifier sets on the sandbox process to hand
/// the stub's endpoint to the harness. Stable: harnesses read these
/// names directly; bumping requires a coordinated lang-emitter
/// update.
pub const fn env_var(self) -> &'static str {
match self {
StubKind::Sql => "NYX_SQL_ENDPOINT",
StubKind::Http => "NYX_HTTP_ENDPOINT",
StubKind::Redis => "NYX_REDIS_ENDPOINT",
StubKind::Filesystem => "NYX_FS_ROOT",
}
}
/// Stable string tag used in [`StubEvent::kind`] serialisation and
/// the oracle's `StubEventMatches` predicate. Lower-case, stable
/// across versions.
pub const fn tag(self) -> &'static str {
match self {
StubKind::Sql => "sql",
StubKind::Http => "http",
StubKind::Redis => "redis",
StubKind::Filesystem => "filesystem",
}
}
/// Derive the set of stubs a payload targeting `cap` needs spawned.
///
/// The mapping is deliberately conservative: only caps whose sinks
/// *cannot* fire in-process without a real boundary auto-derive a
/// stub. Caps like `Cap::CODE_EXEC` or `Cap::FMT_STRING` execute
/// purely inside the harness process and need no stub.
pub fn for_cap(cap: Cap) -> Vec<StubKind> {
let mut out = Vec::new();
if cap.contains(Cap::SQL_QUERY) {
out.push(StubKind::Sql);
}
if cap.contains(Cap::SSRF) || cap.contains(Cap::HEADER_INJECTION) {
out.push(StubKind::Http);
}
if cap.contains(Cap::FILE_IO) {
out.push(StubKind::Filesystem);
}
out
}
}
/// One observation captured by a stub.
///
/// The contents are deliberately type-erased onto strings so all four
/// stub kinds share a single event schema. The `detail` map carries
/// per-kind structured fields (e.g. `method`/`path` for HTTP,
/// `command`/`args` for Redis) that an oracle predicate can dig into
/// without forking the schema by kind.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct StubEvent {
/// Which stub recorded the event.
pub kind: StubKind,
/// Monotonic-ish nanosecond timestamp at capture time. Ordering
/// across stubs is best-effort; absolute value is meaningless.
pub captured_at_ns: u64,
/// One-line human-readable summary. For SQL this is the executed
/// query; for HTTP, the request line; for Redis, the command +
/// args; for filesystem, the absolute path + op kind.
pub summary: String,
/// Per-kind structured fields. Empty when the stub captured only a
/// summary.
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub detail: BTreeMap<String, String>,
}
impl StubEvent {
/// Construct a `StubEvent` stamped with the current monotonic
/// timestamp. Tests pin `captured_at_ns` explicitly for
/// determinism; production stubs use this constructor.
pub fn new(kind: StubKind, summary: impl Into<String>) -> Self {
Self {
kind,
captured_at_ns: monotonic_ns(),
summary: summary.into(),
detail: BTreeMap::new(),
}
}
/// Attach a `detail` field, builder-style.
pub fn with_detail(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.detail.insert(key.into(), value.into());
self
}
}
/// Common operations on a running stub.
///
/// The trait is intentionally minimal so a future stub kind (e.g.
/// gRPC, Kafka) plugs in without touching the runner or the oracle.
pub trait StubProvider: Send + Sync + std::fmt::Debug {
/// Discriminator for type-erased dispatch.
fn kind(&self) -> StubKind;
/// Connection string handed to the harness via
/// [`StubKind::env_var`].
fn endpoint(&self) -> String;
/// Drain every event observed since the last drain. Always returns
/// the events in insertion order; on a poisoned mutex returns an
/// empty vec (the oracle treats "no events" as "stub was not
/// touched").
fn drain_events(&self) -> Vec<StubEvent>;
}
/// Aggregate handle the verifier owns for the lifetime of one
/// `verify_finding` call.
///
/// Holds an `Arc<dyn StubProvider>` per requested kind so individual
/// stubs are dropped exactly when the harness goes out of scope. The
/// runner threads `StubHarness::endpoints()` into the sandbox env and
/// calls [`StubHarness::drain_all`] after each payload run.
#[derive(Debug, Default)]
pub struct StubHarness {
stubs: Vec<Arc<dyn StubProvider>>,
}
impl StubHarness {
/// Start the stubs in `kinds`. Each stub roots itself under
/// `workdir` when it needs disk-backed state (SqlStub's DB file,
/// FilesystemStub's fake root); network stubs ignore `workdir` and
/// bind a random loopback port.
///
/// Returns the first I/O error any stub raises during start. A
/// partial start is *not* exposed: stubs that started before the
/// failing one are dropped immediately so callers cannot observe
/// a half-spawned harness.
pub fn start(kinds: &[StubKind], workdir: &Path) -> std::io::Result<Self> {
let mut stubs: Vec<Arc<dyn StubProvider>> = Vec::with_capacity(kinds.len());
// Deduplicate kinds so repeated entries in spec.stubs_required
// (e.g. cap = SQL_QUERY | SSRF | SQL_QUERY) don't double-spawn.
let mut seen = Vec::with_capacity(kinds.len());
for &k in kinds {
if seen.contains(&k) {
continue;
}
seen.push(k);
let stub: Arc<dyn StubProvider> = match k {
StubKind::Sql => Arc::new(SqlStub::start(workdir)?),
StubKind::Http => Arc::new(HttpStub::start()?),
StubKind::Redis => Arc::new(RedisStub::start()?),
StubKind::Filesystem => Arc::new(FilesystemStub::start(workdir)?),
};
stubs.push(stub);
}
Ok(Self { stubs })
}
/// `(env_var_name, endpoint_value)` pairs the verifier merges into
/// the sandbox env. The order matches `StubHarness::start`'s kinds
/// argument so later entries override earlier ones if a harness is
/// re-used with conflicting requests (it currently never is).
pub fn endpoints(&self) -> Vec<(&'static str, String)> {
self.stubs
.iter()
.map(|s| (s.kind().env_var(), s.endpoint()))
.collect()
}
/// Borrow the underlying stub list (for tests and oracle wiring).
pub fn stubs(&self) -> &[Arc<dyn StubProvider>] {
&self.stubs
}
/// Drain events from every stub, tagging each with the stub kind.
/// Returned in stub-spawn order; within a stub, events keep
/// insertion order.
pub fn drain_all(&self) -> Vec<StubEvent> {
let mut all = Vec::new();
for s in &self.stubs {
all.extend(s.drain_events());
}
all
}
/// True when no stubs were spawned. The 500 ms boot budget in
/// Phase 10's acceptance criteria covers exactly this case.
pub fn is_empty(&self) -> bool {
self.stubs.is_empty()
}
/// Number of spawned stubs (test helper).
pub fn len(&self) -> usize {
self.stubs.len()
}
}
/// Monotonic-ish nanoseconds since boot. Used to timestamp `StubEvent`s
/// so a per-stub event log keeps insertion order even when multiple
/// stubs interleave writes.
pub(crate) fn monotonic_ns() -> u64 {
use std::time::Instant;
use std::sync::OnceLock;
static ORIGIN: OnceLock<Instant> = OnceLock::new();
let origin = *ORIGIN.get_or_init(Instant::now);
origin.elapsed().as_nanos() as u64
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn stub_kind_env_vars_are_distinct() {
let names: Vec<&str> = [
StubKind::Sql,
StubKind::Http,
StubKind::Redis,
StubKind::Filesystem,
]
.iter()
.map(|k| k.env_var())
.collect();
let mut sorted = names.clone();
sorted.sort_unstable();
sorted.dedup();
assert_eq!(sorted.len(), names.len(), "env vars must be unique");
}
#[test]
fn for_cap_sql_query_picks_sql() {
assert_eq!(StubKind::for_cap(Cap::SQL_QUERY), vec![StubKind::Sql]);
}
#[test]
fn for_cap_ssrf_picks_http() {
assert_eq!(StubKind::for_cap(Cap::SSRF), vec![StubKind::Http]);
}
#[test]
fn for_cap_file_io_picks_filesystem() {
assert_eq!(StubKind::for_cap(Cap::FILE_IO), vec![StubKind::Filesystem]);
}
#[test]
fn for_cap_unrelated_cap_picks_nothing() {
assert!(StubKind::for_cap(Cap::CODE_EXEC).is_empty());
}
#[test]
fn for_cap_unions_multi_bit_caps() {
let caps = Cap::SQL_QUERY | Cap::SSRF;
let stubs = StubKind::for_cap(caps);
assert!(stubs.contains(&StubKind::Sql));
assert!(stubs.contains(&StubKind::Http));
assert_eq!(stubs.len(), 2);
}
#[test]
fn empty_kinds_starts_in_under_500ms() {
// The "harness with `stubs_required: []` boots in under 500ms"
// acceptance bullet specifically targets this case — when no
// stubs are requested, StubHarness::start must be a no-op.
let dir = TempDir::new().unwrap();
let start = std::time::Instant::now();
let h = StubHarness::start(&[], dir.path()).unwrap();
let elapsed = start.elapsed();
assert!(h.is_empty(), "empty kinds must spawn nothing");
assert!(
elapsed < std::time::Duration::from_millis(500),
"empty stubs_required must boot in <500ms (was {elapsed:?})"
);
}
#[test]
fn dedup_repeated_kinds_during_start() {
let dir = TempDir::new().unwrap();
let h = StubHarness::start(
&[StubKind::Sql, StubKind::Sql, StubKind::Sql],
dir.path(),
)
.unwrap();
assert_eq!(h.len(), 1, "repeated kinds must be deduped");
}
#[test]
fn endpoints_carries_stub_specific_env_var_names() {
let dir = TempDir::new().unwrap();
let h = StubHarness::start(
&[StubKind::Sql, StubKind::Http, StubKind::Filesystem],
dir.path(),
)
.unwrap();
let names: Vec<&str> = h.endpoints().iter().map(|(n, _)| *n).collect();
assert!(names.contains(&"NYX_SQL_ENDPOINT"));
assert!(names.contains(&"NYX_HTTP_ENDPOINT"));
assert!(names.contains(&"NYX_FS_ROOT"));
}
}

283
src/dynamic/stubs/redis.rs Normal file
View file

@ -0,0 +1,283 @@
//! Minimal RESP-speaking Redis stub (Phase 10 — Track D.3).
//!
//! Speaks just enough of RESP2 to make a real Redis client believe it
//! is talking to a server: inline commands and `*N\r\n$len\r\nvalue\r\n`
//! framed arrays are both accepted; every command is answered with a
//! short canned reply (`+OK\r\n` for writes, `$-1\r\n` for `GET`,
//! `:0\r\n` for `DEL`/`EXISTS`). The point is to capture *which*
//! command + args the harness issued, not to faithfully emulate a
//! cache.
//!
//! Endpoint: `127.0.0.1:{port}` — no scheme prefix because every
//! mainstream Redis client takes a bare `host:port` pair.
//!
//! # Drop
//!
//! Same shutdown shape as [`crate::dynamic::stubs::http::HttpStub`]:
//! signal the accept thread, then connect once to unblock the
//! accept syscall.
use super::{StubEvent, StubKind, StubProvider};
use std::collections::BTreeMap;
use std::io::{BufRead, BufReader, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
/// Localhost RESP command recorder.
#[derive(Debug)]
pub struct RedisStub {
port: u16,
events: Arc<Mutex<Vec<StubEvent>>>,
shutdown: Arc<AtomicBool>,
}
impl RedisStub {
/// Bind to a random loopback port and start accepting connections.
pub fn start() -> std::io::Result<Self> {
let listener = TcpListener::bind("127.0.0.1:0")?;
let port = listener.local_addr()?.port();
let events: Arc<Mutex<Vec<StubEvent>>> = Arc::new(Mutex::new(Vec::new()));
let shutdown = Arc::new(AtomicBool::new(false));
let events_clone = Arc::clone(&events);
let shutdown_clone = Arc::clone(&shutdown);
std::thread::spawn(move || accept_loop(listener, events_clone, shutdown_clone));
Ok(Self { port, events, shutdown })
}
/// Port the listener is bound to.
pub fn port(&self) -> u16 {
self.port
}
/// Host-side helper to record a synthetic command — used by the
/// Phase 10 integration test so we don't need a real Redis
/// client to exercise the event capture path.
pub fn record(&self, command: impl Into<String>, args: &[&str]) {
let cmd_s = command.into();
let mut ev = StubEvent::new(
StubKind::Redis,
format!("{} {}", cmd_s, args.join(" ")).trim().to_owned(),
)
.with_detail("command", cmd_s);
if !args.is_empty() {
ev = ev.with_detail("args", args.join(","));
}
if let Ok(mut g) = self.events.lock() {
g.push(ev);
}
}
}
impl StubProvider for RedisStub {
fn kind(&self) -> StubKind {
StubKind::Redis
}
fn endpoint(&self) -> String {
format!("127.0.0.1:{}", self.port)
}
fn drain_events(&self) -> Vec<StubEvent> {
match self.events.lock() {
Ok(mut g) => std::mem::take(&mut *g),
Err(_) => Vec::new(),
}
}
}
impl Drop for RedisStub {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Relaxed);
let _ = TcpStream::connect(format!("127.0.0.1:{}", self.port));
}
}
fn accept_loop(
listener: TcpListener,
events: Arc<Mutex<Vec<StubEvent>>>,
shutdown: Arc<AtomicBool>,
) {
for stream in listener.incoming() {
if shutdown.load(Ordering::Relaxed) {
break;
}
let Ok(s) = stream else { continue };
let _ = s.set_read_timeout(Some(Duration::from_secs(2)));
let _ = s.set_write_timeout(Some(Duration::from_secs(2)));
let events = Arc::clone(&events);
// Each client gets its own thread so a slow harness does not
// block subsequent test connections.
std::thread::spawn(move || handle_client(s, events));
}
}
/// Loop reading RESP commands from `stream` and recording each one
/// until the client disconnects.
fn handle_client(stream: TcpStream, events: Arc<Mutex<Vec<StubEvent>>>) {
let mut writer = match stream.try_clone() {
Ok(s) => s,
Err(_) => return,
};
let mut reader = BufReader::new(stream);
loop {
let parts = match read_command(&mut reader) {
Some(p) if !p.is_empty() => p,
_ => break,
};
if let Ok(mut g) = events.lock() {
g.push(command_to_event(&parts));
}
let reply = pick_reply(&parts);
if writer.write_all(reply.as_bytes()).is_err() {
break;
}
}
}
/// Read one command (inline or array form). Returns `None` on EOF.
fn read_command(reader: &mut BufReader<TcpStream>) -> Option<Vec<String>> {
let mut first = String::new();
if reader.read_line(&mut first).ok()? == 0 {
return None;
}
let first_trim = first.trim_end_matches(['\r', '\n']);
if first_trim.is_empty() {
return Some(vec![]);
}
if let Some(rest) = first_trim.strip_prefix('*') {
// Array form: `*N\r\n` then N times `$len\r\nbulk\r\n`.
let n: usize = rest.trim().parse().ok()?;
let mut out = Vec::with_capacity(n);
for _ in 0..n {
let mut hdr = String::new();
if reader.read_line(&mut hdr).ok()? == 0 {
return None;
}
let hdr_trim = hdr.trim_end_matches(['\r', '\n']);
let len: usize = hdr_trim.strip_prefix('$')?.trim().parse().ok()?;
let mut buf = vec![0u8; len];
reader.read_exact(&mut buf).ok()?;
// Consume trailing CRLF.
let mut crlf = [0u8; 2];
let _ = reader.read_exact(&mut crlf);
out.push(String::from_utf8_lossy(&buf).into_owned());
}
Some(out)
} else {
// Inline form: whitespace-separated tokens on one line.
Some(
first_trim
.split_whitespace()
.map(|s| s.to_owned())
.collect(),
)
}
}
fn command_to_event(parts: &[String]) -> StubEvent {
let (cmd, args) = parts.split_first().map(|(c, a)| (c.as_str(), a)).unwrap_or(("", &[][..]));
let summary = if args.is_empty() {
cmd.to_owned()
} else {
format!("{} {}", cmd, args.join(" "))
};
let mut detail = BTreeMap::new();
if !cmd.is_empty() {
detail.insert("command".to_owned(), cmd.to_ascii_uppercase());
}
if !args.is_empty() {
detail.insert("args".to_owned(), args.join(","));
}
StubEvent {
kind: StubKind::Redis,
captured_at_ns: super::monotonic_ns(),
summary,
detail,
}
}
fn pick_reply(parts: &[String]) -> &'static str {
let cmd = parts
.first()
.map(|c| c.to_ascii_uppercase())
.unwrap_or_default();
match cmd.as_str() {
"GET" | "HGET" | "LPOP" | "RPOP" => "$-1\r\n",
"DEL" | "EXISTS" | "INCR" | "DECR" | "LLEN" => ":0\r\n",
"PING" => "+PONG\r\n",
_ => "+OK\r\n",
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn endpoint_has_no_scheme_prefix() {
let stub = RedisStub::start().unwrap();
let ep = stub.endpoint();
assert!(ep.starts_with("127.0.0.1:"));
assert!(!ep.contains("://"));
}
#[test]
fn captures_inline_command() {
let stub = RedisStub::start().unwrap();
let mut s = TcpStream::connect(format!("127.0.0.1:{}", stub.port())).unwrap();
s.write_all(b"SET user:1 alice\r\n").unwrap();
s.flush().unwrap();
let mut reply = [0u8; 5];
let _ = s.read_exact(&mut reply);
std::thread::sleep(Duration::from_millis(50));
let events = stub.drain_events();
assert_eq!(events.len(), 1);
assert!(events[0].summary.starts_with("SET"));
assert_eq!(
events[0].detail.get("command").map(String::as_str),
Some("SET")
);
}
#[test]
fn captures_resp_array_command() {
let stub = RedisStub::start().unwrap();
let mut s = TcpStream::connect(format!("127.0.0.1:{}", stub.port())).unwrap();
// `GET sessions`
s.write_all(b"*2\r\n$3\r\nGET\r\n$8\r\nsessions\r\n").unwrap();
s.flush().unwrap();
let mut reply = [0u8; 5];
let _ = s.read_exact(&mut reply);
std::thread::sleep(Duration::from_millis(50));
let events = stub.drain_events();
assert_eq!(events.len(), 1);
assert!(events[0].summary.contains("sessions"));
assert_eq!(
events[0].detail.get("command").map(String::as_str),
Some("GET")
);
}
#[test]
fn record_helper_lands_on_drain() {
let stub = RedisStub::start().unwrap();
stub.record("FLUSHALL", &[]);
stub.record("SET", &["key", "val"]);
let events = stub.drain_events();
assert_eq!(events.len(), 2);
assert!(events[0].summary.contains("FLUSHALL"));
assert!(events[1].summary.contains("key"));
}
#[test]
fn provider_kind_is_redis() {
let stub = RedisStub::start().unwrap();
assert_eq!(stub.kind(), StubKind::Redis);
}
}

266
src/dynamic/stubs/sql.rs Normal file
View file

@ -0,0 +1,266 @@
//! SQL stub backed by an in-memory SQLite database (Phase 10 — Track D.3).
//!
//! The stub creates a fresh SQLite DB inside the verifier's workdir and
//! exposes its absolute path as the endpoint. The harness opens that DB
//! with its language's driver of choice (`sqlite3` in Python, `rusqlite`
//! in Rust, `better-sqlite3` in Node, etc.) and runs queries directly —
//! no wire-protocol bridging.
//!
//! # Query recording
//!
//! The harness writes every executed query to a side log file under
//! the same DB directory (`<endpoint>.log`); the stub reads that log
//! on `drain_events`. This is more flexible than a SQLite trace
//! callback because:
//!
//! 1. The harness owns its connection; a host-side trace callback
//! would only see queries against a host-owned connection.
//! 2. Drivers that wrap their own connection management (e.g.
//! `knex.pg`) cannot expose a low-level trace hook.
//! 3. The Phase 10 acceptance bullet ("captured query visible in the
//! probe output") only needs the queries available to the oracle,
//! not the driver behaviour.
//!
//! The log file is plain text with one query per line. Lines starting
//! with `# ` are treated as detail key/value pairs (e.g. `# driver:
//! psycopg2`) and stitched onto the next event.
//!
//! # Drop
//!
//! On drop the DB file and the log file are deleted along with the
//! enclosing tempdir handle.
use super::{monotonic_ns, StubEvent, StubKind, StubProvider};
use std::fs::OpenOptions;
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use tempfile::TempDir;
/// SQL-cap stub. Endpoint is the absolute path of a SQLite DB file.
#[derive(Debug)]
pub struct SqlStub {
/// Tempdir holding the DB + the recording log. Drop releases both.
tempdir: Option<TempDir>,
/// Path to the SQLite DB file inside `tempdir`.
db_path: PathBuf,
/// Path to the query recording log file inside `tempdir`.
log_path: PathBuf,
/// Read cursor on the log file; used so `drain_events` returns
/// only entries appended since the last drain.
cursor: Mutex<u64>,
}
impl SqlStub {
/// Spin up a fresh SQLite DB under `workdir`'s parent tempdir and
/// return a stub pointing at it.
///
/// `workdir` is used as a hint for placement — the stub creates
/// its own subdir there to avoid colliding with harness-staged
/// files. When `workdir` is not writable, falls back to the
/// process-wide temp directory.
pub fn start(workdir: &Path) -> std::io::Result<Self> {
let tempdir = TempDir::new_in(workdir)
.or_else(|_| TempDir::new())?;
let db_path = tempdir.path().join("nyx_sql_stub.db");
let log_path = tempdir.path().join("nyx_sql_stub.queries.log");
// Touch the DB file so harnesses that open with sqlite3.connect
// do not race a non-existent path. The file is empty; SQLite
// populates the schema on first write.
std::fs::File::create(&db_path)?;
// Truncate the recording log so stale entries from a prior
// (re-used) tempdir cannot poison the oracle.
std::fs::File::create(&log_path)?;
Ok(Self {
tempdir: Some(tempdir),
db_path,
log_path,
cursor: Mutex::new(0),
})
}
/// Absolute path of the SQLite DB file. Synonym for
/// `StubProvider::endpoint` but typed.
pub fn db_path(&self) -> &Path {
&self.db_path
}
/// Absolute path of the query recording log file. Harnesses
/// append one query per line to this path; the stub reads from
/// it on drain.
pub fn log_path(&self) -> &Path {
&self.log_path
}
/// Host-side helper: record a query as if a harness had appended
/// it. Used by the Phase 10 integration test (which simulates
/// harness behaviour with host code) and by future test-only
/// scaffolding.
pub fn record_query(&self, query: &str) -> std::io::Result<()> {
let mut f = OpenOptions::new()
.append(true)
.create(true)
.open(&self.log_path)?;
f.write_all(query.as_bytes())?;
if !query.ends_with('\n') {
f.write_all(b"\n")?;
}
Ok(())
}
}
impl StubProvider for SqlStub {
fn kind(&self) -> StubKind {
StubKind::Sql
}
fn endpoint(&self) -> String {
self.db_path.to_string_lossy().into_owned()
}
fn drain_events(&self) -> Vec<StubEvent> {
let mut cursor = match self.cursor.lock() {
Ok(g) => g,
Err(_) => return Vec::new(),
};
let file = match std::fs::File::open(&self.log_path) {
Ok(f) => f,
Err(_) => return Vec::new(),
};
// Seek to the prior cursor; any line appended after that point
// is a new event. Seek failures bail out without erasing the
// cursor — a later drain will retry from the same position.
use std::io::Seek;
let mut reader = BufReader::new(file);
if reader.seek(std::io::SeekFrom::Start(*cursor)).is_err() {
return Vec::new();
}
let mut events = Vec::new();
let mut pending_detail = std::collections::BTreeMap::<String, String>::new();
let mut bytes_read: u64 = 0;
let mut buf = String::new();
loop {
buf.clear();
let n = match reader.read_line(&mut buf) {
Ok(0) => break,
Ok(n) => n,
Err(_) => break,
};
bytes_read += n as u64;
let line = buf.trim_end_matches(['\r', '\n']).to_owned();
if line.is_empty() {
continue;
}
if let Some(rest) = line.strip_prefix("# ") {
if let Some((k, v)) = rest.split_once(':') {
pending_detail.insert(k.trim().to_owned(), v.trim().to_owned());
}
continue;
}
let mut ev = StubEvent {
kind: StubKind::Sql,
captured_at_ns: monotonic_ns(),
summary: line,
detail: std::collections::BTreeMap::new(),
};
ev.detail.append(&mut pending_detail);
events.push(ev);
}
*cursor += bytes_read;
events
}
}
impl Drop for SqlStub {
fn drop(&mut self) {
// TempDir's own Drop deletes the directory recursively.
self.tempdir.take();
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn start_creates_db_and_log_files() {
let dir = TempDir::new().unwrap();
let stub = SqlStub::start(dir.path()).unwrap();
assert!(stub.db_path().exists(), "DB file must be created");
assert!(stub.log_path().exists(), "log file must be created");
}
#[test]
fn endpoint_returns_db_path_string() {
let dir = TempDir::new().unwrap();
let stub = SqlStub::start(dir.path()).unwrap();
assert_eq!(stub.endpoint(), stub.db_path().to_string_lossy());
}
#[test]
fn record_query_lands_in_drain_events() {
let dir = TempDir::new().unwrap();
let stub = SqlStub::start(dir.path()).unwrap();
stub.record_query("SELECT * FROM users WHERE id = 1").unwrap();
let events = stub.drain_events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].kind, StubKind::Sql);
assert!(events[0].summary.contains("SELECT * FROM users"));
}
#[test]
fn detail_lines_stitch_onto_next_event() {
let dir = TempDir::new().unwrap();
let stub = SqlStub::start(dir.path()).unwrap();
// Hand-craft a log that interleaves a detail line and a query.
let mut f = OpenOptions::new()
.append(true)
.open(stub.log_path())
.unwrap();
f.write_all(b"# driver: psycopg2\nSELECT * FROM accounts\n").unwrap();
drop(f);
let events = stub.drain_events();
assert_eq!(events.len(), 1);
assert_eq!(
events[0].detail.get("driver").map(String::as_str),
Some("psycopg2")
);
}
#[test]
fn drain_returns_only_new_entries() {
let dir = TempDir::new().unwrap();
let stub = SqlStub::start(dir.path()).unwrap();
stub.record_query("SELECT 1").unwrap();
let first = stub.drain_events();
assert_eq!(first.len(), 1);
stub.record_query("SELECT 2").unwrap();
let second = stub.drain_events();
assert_eq!(second.len(), 1, "drain must return only the new entry");
assert!(second[0].summary.contains("SELECT 2"));
}
#[test]
fn drop_cleans_up_tempdir() {
let dir = TempDir::new().unwrap();
let stub = SqlStub::start(dir.path()).unwrap();
let db = stub.db_path().to_owned();
assert!(db.exists());
drop(stub);
assert!(!db.exists(), "DB file must be removed on drop");
}
#[test]
fn provider_kind_is_sql() {
let dir = TempDir::new().unwrap();
let stub = SqlStub::start(dir.path()).unwrap();
assert_eq!(stub.kind(), StubKind::Sql);
}
}

View file

@ -283,6 +283,7 @@ mod tests {
sink_line: 5,
spec_hash: "abcd1234abcd1234".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
}
}

View file

@ -11,6 +11,7 @@ use crate::dynamic::report::{AttemptSummary, VerifyResult, VerifyStatus};
use crate::dynamic::runner::{run_spec, RunError};
use crate::dynamic::sandbox::{toolchain_id_with_digest, SandboxOptions};
use crate::dynamic::spec::{HarnessSpec, SPEC_FORMAT_VERSION};
use crate::dynamic::stubs::StubHarness;
use crate::dynamic::telemetry::{self, TelemetryEvent};
use crate::dynamic::toolchain;
use crate::evidence::{InconclusiveReason, SpecDerivationStrategy, UnsupportedReason};
@ -437,8 +438,38 @@ pub fn verify_finding(diag: &Diag, opts: &VerifyOptions) -> VerifyResult {
}
}
// Phase 10 (Track D.3): spawn the boundary stubs the spec
// demands *before* the sandbox runs. When `stubs_required` is
// empty `StubHarness::start` is a no-op so the 500 ms boot budget
// for stub-less harnesses stays intact. The harness lives for
// the lifetime of this `verify_finding` call; its `Drop` releases
// listening sockets / removes tempdirs at function exit.
let stub_workdir = match opts.project_root.as_deref() {
Some(p) => p.to_owned(),
None => std::env::temp_dir(),
};
let stub_harness = match StubHarness::start(&spec.stubs_required, &stub_workdir) {
Ok(h) => Arc::new(h),
Err(_) => Arc::new(StubHarness::default()),
};
// Build a per-finding `SandboxOptions` clone that carries the
// stub endpoints + the live stub handle. This is the only place
// that mutates the caller's options; downstream cloning happens
// inside `run_spec` so the original `opts.sandbox` is left
// untouched.
let mut sandbox_opts = opts.sandbox.clone();
let mut sandbox_extra_env = sandbox_opts.extra_env.clone();
for (name, value) in stub_harness.endpoints() {
sandbox_extra_env.push((name.to_owned(), value));
}
sandbox_opts.extra_env = sandbox_extra_env;
if !stub_harness.is_empty() {
sandbox_opts.stub_harness = Some(Arc::clone(&stub_harness));
}
let start = Instant::now();
let result = run_spec(&spec, &opts.sandbox);
let result = run_spec(&spec, &sandbox_opts);
let elapsed = start.elapsed();
// Extract build_attempts before result is consumed by build_verdict.

View file

@ -0,0 +1,6 @@
// Phase 10 — FilesystemStub benign control.
//
// The harness reads a sanitised relative path that stays inside
// the fake root. Oracle's needle (`"/etc/passwd"`) is absent,
// so the verdict stays `NotConfirmed`.
read uploads/photo.png

View file

@ -0,0 +1,8 @@
// Phase 10 — FilesystemStub positive fixture (FILE_IO cap).
//
// The harness reads `NYX_FS_ROOT`, then attempts to open a
// path-traversal payload (`../../../etc/passwd`) under that root.
// The wrapper records the access; oracle:
// `Oracle::StubEvent { kind: StubKind::Filesystem, needle:
// "/etc/passwd" }` fires.
read ../../../etc/passwd

View file

@ -0,0 +1,7 @@
// Phase 10 — HttpStub benign control.
//
// Same harness shape as the vuln fixture, but the recorded request
// targets a benign host. The oracle's needle (`"169.254"`) is
// absent, so the verdict stays `NotConfirmed`.
GET /health HTTP/1.1
Host: example.com

View file

@ -0,0 +1,10 @@
// Phase 10 — HttpStub positive fixture (SSRF cap).
//
// The harness reads `NYX_HTTP_ENDPOINT`, opens a TCP connection,
// and issues a GET with an attacker-controlled path. The recorded
// summary is the request line. Oracle:
// `Oracle::StubEvent { kind: StubKind::Http, needle: "169.254" }`
// fires because the URL embeds a metadata-service host the
// untrusted user supplied.
GET /metadata HTTP/1.1
Host: 169.254.169.254

View file

@ -0,0 +1,6 @@
// Phase 10 — RedisStub benign control.
//
// The harness issues a `GET sessions` against the stub. Oracle's
// needle (`"FLUSHALL"`) is absent, so the verdict stays
// `NotConfirmed`.
GET sessions

View file

@ -0,0 +1,7 @@
// Phase 10 — RedisStub positive fixture.
//
// The harness connects to `NYX_REDIS_ENDPOINT` and issues a
// `FLUSHALL` command with the untrusted payload concatenated into
// the key. Oracle: `Oracle::StubEvent { kind: StubKind::Redis,
// needle: "FLUSHALL" }` fires because the command is destructive.
FLUSHALL

View file

@ -0,0 +1,7 @@
// Phase 10 — SqlStub benign control.
//
// Same harness shape as `vuln.txt` but the recorded query does NOT
// contain the tautology. Oracle: `Oracle::StubEvent { kind:
// StubKind::Sql, needle: "OR 1=1" }` does *not* fire so the
// verdict stays `NotConfirmed`.
SELECT * FROM users WHERE name = 'alice';

View file

@ -0,0 +1,9 @@
// Phase 10 — SqlStub positive fixture.
//
// A SQL-cap sink that interpolates an untrusted username straight
// into a SELECT. The driving harness opens the SqlStub's SQLite DB
// (`NYX_SQL_ENDPOINT`), runs the query, and records it on the
// stub. Oracle: `Oracle::StubEvent { kind: StubKind::Sql, needle:
// "OR 1=1" }` fires because the recorded summary contains the
// tautology.
SELECT * FROM users WHERE name = '' OR 1=1 --';

View file

@ -60,6 +60,8 @@ mod escape_tests {
output_limit: 65536,
oob_listener: None,
probe_channel: None,
extra_env: vec![],
stub_harness: None,
}
}

View file

@ -57,6 +57,7 @@ fn flask_spec(entry_rel: &str) -> HarnessSpec {
sink_line: 18,
spec_hash: "phase09testabcd1".into(),
derivation: SpecDerivationStrategy::FromCallgraphEntry,
stubs_required: vec![],
}
}

View file

@ -34,6 +34,7 @@ mod repro_determinism_tests {
sink_line: 10,
spec_hash: spec_hash.to_owned(),
derivation: nyx_scanner::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
}
}
@ -166,6 +167,7 @@ mod repro_determinism_tests {
sink_line: 18,
spec_hash: spec_hash.to_owned(),
derivation: nyx_scanner::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
}
}
@ -297,6 +299,7 @@ fn main() {
sink_line: 8,
spec_hash: spec_hash.to_owned(),
derivation: nyx_scanner::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
}
}
@ -351,6 +354,7 @@ fn main() {
sink_line: 12,
spec_hash: spec_hash.to_owned(),
derivation: nyx_scanner::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
}
}
@ -405,6 +409,7 @@ fn main() {
sink_line: 9,
spec_hash: spec_hash.to_owned(),
derivation: nyx_scanner::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
}
}
@ -459,6 +464,7 @@ fn main() {
sink_line: 9,
spec_hash: spec_hash.to_owned(),
derivation: nyx_scanner::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
}
}

346
tests/stubs_per_cap.rs Normal file
View file

@ -0,0 +1,346 @@
//! Phase 10 (Track D.3) — boundary-stub providers, one positive +
//! one benign per stub kind.
//!
//! Each test wires a [`StubProvider`] to the corresponding fixture's
//! `vuln.txt` / `benign.txt` and asserts that the oracle confirms
//! only when the recorded event matches the kind-specific needle.
//! Synthesises harness behaviour with host-side `record_*` helpers
//! so the suite runs without spawning a language toolchain; the
//! shape mirrors what a real harness would do once the per-language
//! `__nyx_probe` shims gain stub-aware wrappers.
//!
//! Acceptance bullets from `plan.md` phase 10:
//!
//! > `cargo nextest run --features dynamic --test stubs_per_cap` green.
//! > SQL-cap fixture confirms with the captured query visible in the
//! > probe output.
//! > Harness with `stubs_required: []` boots in under 500ms.
#![cfg(feature = "dynamic")]
use nyx_scanner::dynamic::oracle::{
oracle_fired_with_stubs, Oracle, ProbePredicate,
};
use nyx_scanner::dynamic::probe::{ProbeArg, ProbeChannel, SinkProbe};
use nyx_scanner::dynamic::sandbox::SandboxOutcome;
use nyx_scanner::dynamic::stubs::{
FilesystemStub, HttpStub, RedisStub, SqlStub, StubHarness, StubKind, StubProvider,
};
use std::path::PathBuf;
use std::time::Duration;
use tempfile::TempDir;
fn fixture_path(stub_dir: &str, name: &str) -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("dynamic_fixtures")
.join("stubs")
.join(stub_dir)
.join(name)
}
fn read_fixture(stub_dir: &str, name: &str) -> String {
std::fs::read_to_string(fixture_path(stub_dir, name))
.unwrap_or_else(|e| panic!("read fixture {stub_dir}/{name}: {e}"))
}
/// Extract the last non-comment, non-blank line. Fixture comments
/// begin with `//`; the payload is the surviving line.
fn extract_payload(s: &str) -> String {
s.lines()
.filter(|l| !l.trim().is_empty() && !l.trim_start().starts_with("//"))
.last()
.unwrap_or("")
.trim()
.to_owned()
}
fn empty_outcome() -> SandboxOutcome {
SandboxOutcome {
exit_code: Some(0),
stdout: vec![],
stderr: vec![],
timed_out: false,
oob_callback_seen: false,
sink_hit: true,
duration: Duration::from_millis(1),
}
}
// ── SQL stub ─────────────────────────────────────────────────────────
#[test]
fn sql_stub_vuln_fixture_confirms_with_captured_query() {
let dir = TempDir::new().unwrap();
let stub = SqlStub::start(dir.path()).unwrap();
// Synthetic harness: read the vuln fixture, record the executed
// query against the stub, then evaluate the oracle.
let payload = extract_payload(&read_fixture("sql", "vuln.txt"));
assert!(payload.contains("OR 1=1"), "vuln fixture must carry a tautology");
stub.record_query(&payload).unwrap();
let oracle = Oracle::StubEvent {
kind: StubKind::Sql,
needle: "OR 1=1",
};
let events = stub.drain_events();
assert_eq!(events.len(), 1, "stub must have captured the executed query");
assert!(
events[0].summary.contains("OR 1=1"),
"captured query must be visible in probe output: {:?}",
events[0].summary,
);
assert!(
oracle_fired_with_stubs(&oracle, &empty_outcome(), &[], &events),
"SQL stub oracle must confirm the captured tautology",
);
}
#[test]
fn sql_stub_benign_fixture_does_not_confirm() {
let dir = TempDir::new().unwrap();
let stub = SqlStub::start(dir.path()).unwrap();
let payload = extract_payload(&read_fixture("sql", "benign.txt"));
assert!(!payload.contains("OR 1=1"), "benign control must lack tautology");
stub.record_query(&payload).unwrap();
let oracle = Oracle::StubEvent {
kind: StubKind::Sql,
needle: "OR 1=1",
};
let events = stub.drain_events();
assert!(
!oracle_fired_with_stubs(&oracle, &empty_outcome(), &[], &events),
"benign control must not satisfy the oracle",
);
}
#[test]
fn sql_stub_captured_query_threads_through_probe_predicate() {
// The plan calls for `ProbePredicate::StubEventMatches` as a
// cross-cutting predicate inside `Oracle::SinkProbe`. Confirm
// the predicate path fires with the same fixture.
let dir = TempDir::new().unwrap();
let stub = SqlStub::start(dir.path()).unwrap();
let payload = extract_payload(&read_fixture("sql", "vuln.txt"));
stub.record_query(&payload).unwrap();
let events = stub.drain_events();
// Pair the stub-event check with a per-probe `CalleeEquals` so
// we exercise the predicate-partition path in
// `oracle_fired_with_stubs`.
let probe = SinkProbe {
sink_callee: "sqlite3.execute".into(),
args: vec![ProbeArg::String(payload.clone())],
captured_at_ns: 1,
payload_id: "sql-tautology".into(),
kind: Default::default(),
witness: Default::default(),
};
let oracle = Oracle::SinkProbe {
predicates: &[
ProbePredicate::CalleeEquals("sqlite3.execute"),
ProbePredicate::StubEventMatches {
kind: StubKind::Sql,
needle: "OR 1=1",
},
],
};
assert!(
oracle_fired_with_stubs(&oracle, &empty_outcome(), &[probe], &events),
"ProbePredicate::StubEventMatches must satisfy when stub log has needle",
);
}
// ── HTTP stub ────────────────────────────────────────────────────────
#[test]
fn http_stub_vuln_fixture_confirms_recorded_request() {
let stub = HttpStub::start().unwrap();
let payload = extract_payload(&read_fixture("http", "vuln.txt"));
assert!(payload.contains("169.254"), "vuln fixture must carry metadata host");
stub.record(payload.clone());
let events = stub.drain_events();
assert_eq!(events.len(), 1);
assert!(events[0].summary.contains("169.254"));
let oracle = Oracle::StubEvent {
kind: StubKind::Http,
needle: "169.254",
};
assert!(oracle_fired_with_stubs(&oracle, &empty_outcome(), &[], &events));
}
#[test]
fn http_stub_benign_fixture_does_not_confirm() {
let stub = HttpStub::start().unwrap();
let payload = extract_payload(&read_fixture("http", "benign.txt"));
stub.record(payload);
let events = stub.drain_events();
let oracle = Oracle::StubEvent {
kind: StubKind::Http,
needle: "169.254",
};
assert!(!oracle_fired_with_stubs(&oracle, &empty_outcome(), &[], &events));
}
// ── Redis stub ───────────────────────────────────────────────────────
#[test]
fn redis_stub_vuln_fixture_confirms_destructive_command() {
let stub = RedisStub::start().unwrap();
let payload = extract_payload(&read_fixture("redis", "vuln.txt"));
assert!(payload.contains("FLUSHALL"));
stub.record(payload, &[]);
let events = stub.drain_events();
let oracle = Oracle::StubEvent {
kind: StubKind::Redis,
needle: "FLUSHALL",
};
assert!(oracle_fired_with_stubs(&oracle, &empty_outcome(), &[], &events));
}
#[test]
fn redis_stub_benign_fixture_does_not_confirm() {
let stub = RedisStub::start().unwrap();
let payload = extract_payload(&read_fixture("redis", "benign.txt"));
let mut parts = payload.split_whitespace();
let cmd = parts.next().unwrap_or("");
let args: Vec<&str> = parts.collect();
stub.record(cmd, &args);
let events = stub.drain_events();
let oracle = Oracle::StubEvent {
kind: StubKind::Redis,
needle: "FLUSHALL",
};
assert!(!oracle_fired_with_stubs(&oracle, &empty_outcome(), &[], &events));
}
// ── Filesystem stub ──────────────────────────────────────────────────
#[test]
fn filesystem_stub_vuln_fixture_confirms_path_traversal() {
let dir = TempDir::new().unwrap();
let stub = FilesystemStub::start(dir.path()).unwrap();
let payload = extract_payload(&read_fixture("filesystem", "vuln.txt"));
let (op, path) = payload.split_once(' ').unwrap_or(("read", &payload));
stub.record_access(op, path);
let events = stub.drain_events();
let oracle = Oracle::StubEvent {
kind: StubKind::Filesystem,
needle: "/etc/passwd",
};
assert!(oracle_fired_with_stubs(&oracle, &empty_outcome(), &[], &events));
}
#[test]
fn filesystem_stub_benign_fixture_does_not_confirm() {
let dir = TempDir::new().unwrap();
let stub = FilesystemStub::start(dir.path()).unwrap();
let payload = extract_payload(&read_fixture("filesystem", "benign.txt"));
let (op, path) = payload.split_once(' ').unwrap_or(("read", &payload));
stub.record_access(op, path);
let events = stub.drain_events();
let oracle = Oracle::StubEvent {
kind: StubKind::Filesystem,
needle: "/etc/passwd",
};
assert!(!oracle_fired_with_stubs(&oracle, &empty_outcome(), &[], &events));
}
// ── Performance invariant ────────────────────────────────────────────
#[test]
fn empty_stubs_required_boots_under_500ms() {
// Phase 10 acceptance bullet: "Harness with `stubs_required: []`
// boots in under 500ms (performance invariant from cross-cutting
// concerns)." Direct measurement on `StubHarness::start`.
let dir = TempDir::new().unwrap();
let start = std::time::Instant::now();
let h = StubHarness::start(&[], dir.path()).unwrap();
let elapsed = start.elapsed();
assert!(h.is_empty());
assert!(
elapsed < Duration::from_millis(500),
"stubs_required=[] must boot in <500ms, took {elapsed:?}",
);
}
#[test]
fn harness_endpoints_carry_well_known_env_names() {
// Pull every stub kind so the test asserts the full mapping in
// `StubKind::env_var` survives at the aggregator level.
let dir = TempDir::new().unwrap();
let h = StubHarness::start(
&[
StubKind::Sql,
StubKind::Http,
StubKind::Redis,
StubKind::Filesystem,
],
dir.path(),
)
.unwrap();
let names: Vec<&str> = h.endpoints().iter().map(|(n, _)| *n).collect();
assert!(names.contains(&"NYX_SQL_ENDPOINT"));
assert!(names.contains(&"NYX_HTTP_ENDPOINT"));
assert!(names.contains(&"NYX_REDIS_ENDPOINT"));
assert!(names.contains(&"NYX_FS_ROOT"));
}
#[test]
fn drained_events_are_kind_tagged() {
// Cross-stub drain: when a harness aggregates multiple stubs,
// each drained event must carry its source kind so the oracle's
// `StubEventMatches { kind, .. }` filter works without external
// bookkeeping.
let dir = TempDir::new().unwrap();
let sql = SqlStub::start(dir.path()).unwrap();
let fs = FilesystemStub::start(dir.path()).unwrap();
sql.record_query("SELECT 1").unwrap();
fs.record_access("read", "/tmp/x");
let mut all = sql.drain_events();
all.extend(fs.drain_events());
let kinds: Vec<StubKind> = all.iter().map(|e| e.kind).collect();
assert!(kinds.contains(&StubKind::Sql));
assert!(kinds.contains(&StubKind::Filesystem));
}
#[test]
fn sql_stub_captured_query_visible_in_probe_output() {
// The plan's literal phrasing: "SQL-cap fixture confirms with the
// captured query visible in the probe output." Verify that the
// recorded query lands inside a serialisable probe-shaped record
// (`StubEvent` round-trips through serde) so downstream tooling
// can render the captured query alongside per-probe args.
let dir = TempDir::new().unwrap();
let workdir = TempDir::new().unwrap();
let stub = SqlStub::start(dir.path()).unwrap();
let payload = extract_payload(&read_fixture("sql", "vuln.txt"));
stub.record_query(&payload).unwrap();
let events = stub.drain_events();
let event = events.first().expect("captured event");
// Round-trip through serde so the assertion mirrors what the
// verifier writes into a repro bundle.
let serialised = serde_json::to_string(event).unwrap();
assert!(
serialised.contains("OR 1=1"),
"captured query must survive serialisation: {serialised}",
);
// Also confirm the probe channel adjacent to the stub is empty
// — the captured query lives on the stub event log, not on the
// probe channel. This locks the partition the oracle relies on.
let channel = ProbeChannel::for_workdir(workdir.path()).unwrap();
assert!(channel.drain().is_empty());
}