nyx/src/dynamic/probe.rs

581 lines
24 KiB
Rust

//! Structured sink-probe channel (Phase 06 — Track C.1).
//!
//! Replaces the brittle stdout-substring matching path with a per-run JSON-line
//! channel. Each harness defines a `__nyx_probe` shim (see the per-language
//! emitter in [`crate::dynamic::lang`]) that writes one [`SinkProbe`] record
//! to the channel when the instrumented sink fires. After each sandbox run
//! the runner calls [`ProbeChannel::drain`] and the oracle (see
//! [`crate::dynamic::oracle::oracle_fired`]) evaluates a payload's
//! [`crate::dynamic::oracle::ProbePredicate`] set against the captured args.
//!
//! # Phase 08 extensions (Track C.4 + C.5)
//!
//! - [`ProbeKind`] discriminates a normal sink observation from a crash
//! intercepted by a sink-site signal handler. The handler stamps
//! `ProbeKind::Crash { signal }` onto the probe before re-raising so the
//! oracle can distinguish "the sink crashed under my payload"
//! (Confirmed) from "some unrelated setup code crashed"
//! (Inconclusive(UnrelatedCrash)).
//! - [`ProbeWitness`] carries bounded forensic data — scrubbed env, cwd,
//! payload-bytes prefix, callee, args repr — so downstream repro and
//! chain composition need only the probe file, not a live sandbox. All
//! bounding goes through [`crate::dynamic::policy`].
//!
//! # Channel medium
//!
//! Currently file-based: one JSON record per line at
//! `<workdir>/__nyx_probes.jsonl`. The path is exposed to the harness via
//! the `NYX_PROBE_PATH` env var (see [`PROBE_PATH_ENV`]). Named-pipe (FIFO)
//! transport is deferred; the file variant works on every platform the
//! sandbox supports and matches the drain-after-run lifecycle the runner
//! actually uses — there are no streaming consumers.
//!
//! Records are appended, so a single payload can fire the shim multiple
//! times (e.g. inside a retry loop) and the oracle sees every observation.
//! The runner truncates the file via [`ProbeChannel::clear`] before each
//! payload to keep verdicts independent.
use crate::dynamic::oracle::Signal;
use crate::dynamic::policy;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::sync::Mutex;
/// Default filename for the file-backed probe channel inside a harness
/// workdir. The harness shim and the runner both build their paths off
/// this constant so they cannot drift apart.
pub const PROBE_FILENAME: &str = "__nyx_probes.jsonl";
/// Env-var name that carries the absolute path of the probe channel into
/// the harness process. Read by the per-language `__nyx_probe` shim.
pub const PROBE_PATH_ENV: &str = "NYX_PROBE_PATH";
/// Identifier of the payload that triggered the probe. Currently the
/// static [`crate::dynamic::corpus::CuratedPayload::label`] string; future
/// fuzzer-generated payloads will use the corpus hash.
pub type PayloadId = String;
/// A single captured argument observed at the sink call site.
///
/// The harness shim chooses the variant based on the argument's runtime
/// type so the oracle can apply byte-level predicates without losing
/// information to lossy string conversion.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "kind", content = "value")]
pub enum ProbeArg {
/// UTF-8 string argument.
String(String),
/// Raw byte buffer (e.g. `bytes` in Python, `Buffer` in Node).
Bytes(Vec<u8>),
/// Signed 64-bit integer.
Int(i64),
}
impl ProbeArg {
/// String view, when the arg is textual. Returns `None` for `Int` and
/// non-UTF-8 `Bytes`.
pub fn as_str(&self) -> Option<&str> {
match self {
ProbeArg::String(s) => Some(s.as_str()),
ProbeArg::Bytes(b) => std::str::from_utf8(b).ok(),
ProbeArg::Int(_) => None,
}
}
/// Byte view, when the arg is byte-shaped. Returns `None` for `Int`.
pub fn as_bytes(&self) -> Option<&[u8]> {
match self {
ProbeArg::String(s) => Some(s.as_bytes()),
ProbeArg::Bytes(b) => Some(b),
ProbeArg::Int(_) => None,
}
}
/// Integer view, when the arg is `Int`.
pub fn as_int(&self) -> Option<i64> {
match self {
ProbeArg::Int(i) => Some(*i),
_ => None,
}
}
}
/// Discriminator on a [`SinkProbe`] (Phase 08 — Track C.4).
///
/// Distinguishes a probe written from the normal sink-instrumentation
/// path from one written by a sink-site signal handler when the sink
/// invocation crashed under the active payload. The oracle's
/// [`crate::dynamic::oracle::Oracle::SinkCrash`] variant ignores anything
/// other than `Crash { signal }`, so a process-level abort outside the
/// sink no longer satisfies the oracle.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "kind")]
pub enum ProbeKind {
/// Standard sink observation: arguments were captured before the sink
/// returned normally (or raised a non-crash exception).
Normal,
/// Sink invocation was interrupted by a fatal signal that the
/// sink-site handler intercepted. The captured `signal` is the one
/// the handler observed; the handler re-raises after writing the
/// probe so the runner's outcome still records the process death.
Crash {
/// Signal that interrupted the sink call.
signal: Signal,
},
/// Phase 03 (Track J.1) deserialization-sink observation. Stamped
/// by the per-language harness shim when the instrumented
/// deserialiser (`ObjectInputStream.resolveClass`,
/// `pickle.Unpickler.find_class`, `unserialize` `__wakeup`,
/// `Marshal.load` const lookup) is asked to materialise a class
/// outside the harness's allowlist. `gadget_chain_invoked` is
/// `true` when the disallowed class was actually constructed (i.e.
/// the gadget chain ran) and `false` when the shim caught it at
/// the resolution boundary before any sink effect.
Deserialize {
/// `true` iff the disallowed gadget class was instantiated /
/// executed before the shim aborted the chain.
gadget_chain_invoked: bool,
},
/// Phase 05 (Track J.3) XXE-sink observation. Stamped by the
/// per-language XML harness shim when the instrumented parser
/// (`DocumentBuilder.parse`, `lxml.etree.XMLParser`,
/// `simplexml_load_string` under `libxml_disable_entity_loader(false)`,
/// `encoding/xml.Decoder` with `Strict: false`, Ruby `REXML` /
/// `Nokogiri::XML`) consumes a payload carrying a `<!ENTITY …>`
/// declaration that the parser then expands inside the document
/// body. `entity_expanded` is `true` when the entity body was
/// substituted into the parsed tree (the differential rule's
/// proof that XXE expansion actually fired) and `false` when the
/// parser refused the doctype / external resolution (the benign
/// `disallow-doctype-decl` control).
Xxe {
/// `true` iff the parser substituted the entity body into the
/// parsed XML output.
entity_expanded: bool,
},
/// Phase 06 (Track J.4) LDAP-sink observation. Stamped by the
/// per-language LDAP harness shim when the instrumented client
/// (`LdapTemplate.search`, `ldap.search_s`, `ldap_search`) issues a
/// filter against the in-sandbox
/// [`ldap_server`](crate::dynamic::stubs::ldap_server) stub. The
/// shim records the number of directory entries the stub returned
/// for the supplied filter — the differential oracle's
/// [`crate::dynamic::oracle::ProbePredicate::QueryResultCountGreaterThan`]
/// fires when `entries_returned > n`, catching a malicious filter
/// (e.g. `*)(uid=*`) that matched more than the originally-intended
/// user. Benign filter-quoted controls produce
/// `entries_returned == 1`.
Ldap {
/// Count of directory entries the stub LDAP server returned
/// for the payload's filter.
entries_returned: u32,
},
/// Phase 07 (Track J.5) XPath-sink observation. Stamped by the
/// per-language XPath harness shim when the instrumented evaluator
/// (`javax.xml.xpath.XPath.evaluate`, `lxml.etree.xpath`,
/// `DOMXPath::query`, the npm `xpath` package's `select`) issues
/// an XPath expression against the canonical XML document staged
/// in the workdir (`xpath_corpus.xml`). The shim records the
/// number of nodes the evaluator returned — the differential
/// oracle's
/// [`crate::dynamic::oracle::ProbePredicate::QueryResultCountGreaterThan`]
/// fires when `nodes_returned > n`, catching a malicious
/// expression (e.g. `' or '1'='1`) that selected every node.
/// Benign quoted controls produce `nodes_returned == 1`.
Xpath {
/// Count of XML nodes the staged document returned for the
/// payload's XPath expression.
nodes_returned: u32,
},
/// Phase 08 (Track J.6) HTTP-response-header-write observation.
/// Stamped by the per-language harness shim's instrumented header
/// setter (`HttpServletResponse.setHeader`,
/// `flask.Response.headers.__setitem__`, `header(...)`,
/// `Rack::Response#set_header`, `res.setHeader`, `w.Header().Set`,
/// `HeaderMap::insert`). The shim records exactly one probe per
/// `setHeader(name, value)` call carrying the raw bytes the host
/// process emitted — the
/// [`crate::dynamic::oracle::ProbePredicate::HeaderInjected`]
/// predicate scans `value` for an embedded `\r\n` byte pair, which
/// is the signal that the attacker payload split one header into
/// two on the wire.
HeaderEmit {
/// Header name the host attempted to set (e.g. `"Set-Cookie"`,
/// `"Location"`). Echoed verbatim so the predicate can pin
/// per-header expectations without name normalisation.
name: String,
/// Raw header value the host attempted to set. A vulnerable
/// host concatenates attacker bytes into this string without
/// CRLF stripping; a benign host URL-encodes them (`%0d%0a`).
value: String,
},
}
impl Default for ProbeKind {
fn default() -> Self {
ProbeKind::Normal
}
}
/// Bounded forensic snapshot captured alongside a [`SinkProbe`]
/// (Phase 08 — Track C.5).
///
/// Every byte that lands in a witness is policed by
/// [`crate::dynamic::policy`]: env keys are scrubbed against
/// [`crate::dynamic::policy::DENY_KEY_SUBSTRINGS`] and payload bytes are
/// truncated at [`crate::dynamic::policy::PAYLOAD_CAPTURE_LIMIT_BYTES`].
/// All fields are `#[serde(default, skip_serializing_if = "...")]` so
/// host-side host-emitted probes (which don't carry a witness) and
/// per-language shim-emitted probes (which do) round-trip through the
/// same JSON schema.
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct ProbeWitness {
/// Scrubbed snapshot of the harness process environment at probe
/// time. Keys matching a deny substring carry
/// [`crate::dynamic::policy::REDACTED_VALUE`].
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub env_snapshot: BTreeMap<String, String>,
/// Current working directory of the harness when the probe fired.
/// Empty when the language shim could not determine it.
#[serde(default, skip_serializing_if = "String::is_empty")]
pub cwd: String,
/// Head-truncated payload bytes routed into the sink, capped at
/// [`crate::dynamic::policy::PAYLOAD_CAPTURE_LIMIT_BYTES`].
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub payload_bytes: Vec<u8>,
/// Same callee name as [`SinkProbe::sink_callee`]; retained on the
/// witness so repro tooling can consume the witness in isolation.
#[serde(default, skip_serializing_if = "String::is_empty")]
pub callee: String,
/// Per-arg human-readable repr, parallel to [`SinkProbe::args`].
/// `String` for textual / numeric args; `"<bytes:N>"` for binary
/// payloads the shim chose not to inline.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub args_repr: Vec<String>,
}
impl ProbeWitness {
/// An empty witness — every field at its `Default` value. Used by
/// tests and the host-side [`ProbeChannel::write`] path that does
/// not snapshot any forensic state.
pub fn empty() -> Self {
Self::default()
}
/// Construct a bounded witness from raw inputs. Goes through
/// [`crate::dynamic::policy::scrub_env`],
/// [`crate::dynamic::policy::truncate_payload_bytes`], and
/// [`crate::dynamic::policy::Scrubber`] (Phase 28 — Track H.5) so
/// the host-side constructor cannot accidentally produce an
/// unscrubbed / unbounded witness. Every textual field
/// (`env_snapshot` values, `cwd`, each `args_repr` entry) is routed
/// through the scrubber before the witness is serialised, and the
/// truncated `payload_bytes` slice is routed through the
/// byte-aware [`crate::dynamic::policy::Scrubber::scrub_bytes`] so
/// real-world payloads carrying credential tokens are replaced with
/// a deterministic same-length placeholder while curated corpus
/// payloads pass through unchanged.
pub fn from_inputs<I, S>(
env: I,
cwd: impl Into<String>,
payload: &[u8],
callee: impl Into<String>,
args_repr: Vec<String>,
) -> Self
where
I: IntoIterator<Item = (S, S)>,
S: Into<String>,
{
let scrubber = policy::Scrubber::project_default();
let env_snapshot: BTreeMap<String, String> = policy::scrub_env(env)
.into_iter()
.map(|(k, v)| (k, scrubber.scrub_string(&v)))
.collect();
let scrubbed_args: Vec<String> = args_repr
.into_iter()
.map(|s| scrubber.scrub_string(&s))
.collect();
let scrubbed_callee = scrubber.scrub_string(&callee.into());
let scrubbed_cwd = scrubber.scrub_string(&cwd.into());
let truncated = policy::truncate_payload_bytes(payload);
let scrubbed_payload = scrubber.scrub_bytes(truncated);
Self {
env_snapshot,
cwd: scrubbed_cwd,
payload_bytes: scrubbed_payload,
callee: scrubbed_callee,
args_repr: scrubbed_args,
}
}
}
/// One structured observation written by the harness when the instrumented
/// sink fires. Serialised as a single JSON object on its own line.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SinkProbe {
/// Fully-qualified or last-segment callee name of the fired sink
/// (e.g. `"os.system"`, `"Runtime.exec"`).
pub sink_callee: String,
/// Captured positional arguments, left-to-right. Empty when the sink
/// takes no arguments or the shim could not introspect them.
pub args: Vec<ProbeArg>,
/// Monotonic-ish nanosecond timestamp captured at write time. Used to
/// order multiple probe entries from the same run; absolute value is
/// not meaningful across runs.
pub captured_at_ns: u64,
/// Identifier of the payload in flight when the probe fired.
pub payload_id: PayloadId,
/// Phase 08: normal sink observation vs sink-site crash. Defaults to
/// `Normal` so probes written by the Phase 06 shims (no `kind` field
/// on the wire) deserialise as normal observations.
#[serde(default)]
pub kind: ProbeKind,
/// Phase 08: bounded forensic snapshot. Empty when the shim did not
/// capture one — the field stays `default` so older probe files
/// round-trip unchanged.
#[serde(default)]
pub witness: ProbeWitness,
}
/// Per-run handle on a file-backed [`SinkProbe`] channel.
///
/// Construction creates / truncates the underlying file under `workdir`;
/// [`clear`](ProbeChannel::clear) re-truncates between payload runs;
/// [`drain`](ProbeChannel::drain) reads every record currently buffered.
#[derive(Debug)]
pub struct ProbeChannel {
path: PathBuf,
/// Serialises read / write / truncate operations against the underlying
/// file from the host side. The harness process writes from its own
/// address space; this lock only protects host-side callers (test
/// helpers, the runner).
io_lock: Mutex<()>,
}
impl ProbeChannel {
/// Construct a channel rooted at `<workdir>/__nyx_probes.jsonl`.
///
/// Creates the file (truncating any previous contents) so a stale
/// probe file left over from a prior workdir reuse cannot poison the
/// next run's oracle.
pub fn for_workdir(workdir: &Path) -> std::io::Result<Self> {
let path = workdir.join(PROBE_FILENAME);
File::create(&path)?;
Ok(Self {
path,
io_lock: Mutex::new(()),
})
}
/// Construct a channel at an explicit path (test helper). Mirrors
/// [`for_workdir`](ProbeChannel::for_workdir) but does not assume any
/// directory layout.
pub fn at_path(path: PathBuf) -> std::io::Result<Self> {
File::create(&path)?;
Ok(Self {
path,
io_lock: Mutex::new(()),
})
}
/// Absolute path of the probe file. Forwarded to the harness process
/// via the `NYX_PROBE_PATH` env var.
pub fn path(&self) -> &Path {
&self.path
}
/// Truncate the channel between payload runs. Cheap: a single
/// `File::create` on the existing path.
pub fn clear(&self) -> std::io::Result<()> {
let _guard = self.io_lock.lock().ok();
File::create(&self.path)?;
Ok(())
}
/// Read every record currently buffered. Malformed lines (truncated
/// writes, partial flushes) are skipped silently — the oracle treats a
/// missing probe as "sink did not fire" without distinguishing causes.
pub fn drain(&self) -> Vec<SinkProbe> {
let _guard = self.io_lock.lock().ok();
let file = match File::open(&self.path) {
Ok(f) => f,
Err(_) => return Vec::new(),
};
let reader = BufReader::new(file);
let mut out = Vec::new();
for line in reader.lines().map_while(Result::ok) {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(p) = serde_json::from_str::<SinkProbe>(trimmed) {
out.push(p);
}
}
out
}
/// Append a probe record from the host side. Primarily a test helper:
/// in production the harness process writes directly via its
/// per-language shim, bypassing this entry point.
pub fn write(&self, probe: &SinkProbe) -> std::io::Result<()> {
let _guard = self.io_lock.lock().ok();
let mut file = OpenOptions::new()
.append(true)
.create(true)
.open(&self.path)?;
let line = serde_json::to_string(probe).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, e)
})?;
file.write_all(line.as_bytes())?;
file.write_all(b"\n")?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn sample_probe(label: &str) -> SinkProbe {
SinkProbe {
sink_callee: "os.system".into(),
args: vec![ProbeArg::String("ls; whoami".into())],
captured_at_ns: 42,
payload_id: label.into(),
kind: ProbeKind::Normal,
witness: ProbeWitness::empty(),
}
}
#[test]
fn channel_round_trip_writes_and_drains() {
let dir = TempDir::new().unwrap();
let ch = ProbeChannel::for_workdir(dir.path()).unwrap();
ch.write(&sample_probe("cmdi-echo-marker")).unwrap();
ch.write(&sample_probe("cmdi-echo-marker-2")).unwrap();
let probes = ch.drain();
assert_eq!(probes.len(), 2);
assert_eq!(probes[0].payload_id, "cmdi-echo-marker");
assert_eq!(probes[1].payload_id, "cmdi-echo-marker-2");
}
#[test]
fn drain_after_clear_returns_empty() {
let dir = TempDir::new().unwrap();
let ch = ProbeChannel::for_workdir(dir.path()).unwrap();
ch.write(&sample_probe("a")).unwrap();
ch.clear().unwrap();
assert!(ch.drain().is_empty());
}
#[test]
fn drain_skips_malformed_lines() {
let dir = TempDir::new().unwrap();
let ch = ProbeChannel::for_workdir(dir.path()).unwrap();
// Manually append a junk line, then a valid one.
std::fs::write(ch.path(), "this is not json\n").unwrap();
ch.write(&sample_probe("after-junk")).unwrap();
let probes = ch.drain();
assert_eq!(probes.len(), 1);
assert_eq!(probes[0].payload_id, "after-junk");
}
#[test]
fn probe_arg_views() {
let s = ProbeArg::String("hello".into());
assert_eq!(s.as_str(), Some("hello"));
assert_eq!(s.as_bytes(), Some(&b"hello"[..]));
assert_eq!(s.as_int(), None);
let i = ProbeArg::Int(7);
assert_eq!(i.as_str(), None);
assert_eq!(i.as_bytes(), None);
assert_eq!(i.as_int(), Some(7));
let b = ProbeArg::Bytes(vec![b'h', b'i']);
assert_eq!(b.as_str(), Some("hi"));
assert_eq!(b.as_bytes(), Some(&[b'h', b'i'][..]));
}
#[test]
fn empty_channel_drains_to_empty_vec() {
let dir = TempDir::new().unwrap();
let ch = ProbeChannel::for_workdir(dir.path()).unwrap();
assert!(ch.drain().is_empty());
}
#[test]
fn probe_kind_defaults_to_normal_when_field_omitted() {
// Legacy probe-line shape (Phase 06) — no `kind` field on the wire.
let line = r#"{"sink_callee":"os.system","args":[],"captured_at_ns":1,"payload_id":"p"}"#;
let p: SinkProbe = serde_json::from_str(line).unwrap();
assert_eq!(p.kind, ProbeKind::Normal);
assert_eq!(p.witness, ProbeWitness::empty());
}
#[test]
fn crash_probe_round_trips_through_channel() {
let dir = TempDir::new().unwrap();
let ch = ProbeChannel::for_workdir(dir.path()).unwrap();
let mut p = sample_probe("crash-test");
p.kind = ProbeKind::Crash { signal: Signal::Sigsegv };
ch.write(&p).unwrap();
let drained = ch.drain();
assert_eq!(drained.len(), 1);
assert!(matches!(
drained[0].kind,
ProbeKind::Crash { signal: Signal::Sigsegv }
));
}
#[test]
fn witness_from_inputs_hashes_pii_args() {
let env: Vec<(String, String)> = vec![];
let w = ProbeWitness::from_inputs(
env,
"/tmp/run",
b"payload",
"os.system",
vec!["nyx-stub-secret-aaa-bbb-ccc".to_owned()],
);
// The args_repr entry contained a project-stub-secret literal and
// must be hashed before the witness is serialised.
assert_eq!(w.args_repr.len(), 1);
assert!(
w.args_repr[0].starts_with(policy::SCRUB_HASH_PREFIX),
"args_repr value should be scrubbed; got {}",
w.args_repr[0]
);
assert!(!w.args_repr[0].contains("aaa-bbb-ccc"));
}
#[test]
fn witness_from_inputs_redacts_and_truncates() {
let huge_payload = vec![0xAB; policy::PAYLOAD_CAPTURE_LIMIT_BYTES * 2];
let env = vec![
("PATH".to_owned(), "/bin".to_owned()),
("AWS_SECRET_ACCESS_KEY".to_owned(), "secret!!!".to_owned()),
];
let w = ProbeWitness::from_inputs(
env,
"/tmp/run",
&huge_payload,
"os.system",
vec!["ls; whoami".to_owned()],
);
assert_eq!(w.cwd, "/tmp/run");
assert_eq!(w.payload_bytes.len(), policy::PAYLOAD_CAPTURE_LIMIT_BYTES);
assert_eq!(w.env_snapshot.get("PATH").map(String::as_str), Some("/bin"));
assert_eq!(
w.env_snapshot.get("AWS_SECRET_ACCESS_KEY").map(String::as_str),
Some(policy::REDACTED_VALUE)
);
assert_eq!(w.args_repr, vec!["ls; whoami".to_owned()]);
assert_eq!(w.callee, "os.system");
}
}