diff --git a/src/dynamic/lang/python.rs b/src/dynamic/lang/python.rs index 4833b754..6970a34f 100644 --- a/src/dynamic/lang/python.rs +++ b/src/dynamic/lang/python.rs @@ -705,6 +705,26 @@ pub fn emit(spec: &HarnessSpec) -> Result { return Ok(emit_json_parse_harness(spec)); } + // Phase 11 (Track J.9): UNAUTHORIZED_ID harness. Imports the + // fixture, invokes the entry with the payload as the requested + // owner_id, and emits a `ProbeKind::IdorAccess { caller_id, owner_id }` + // whenever the fixture materialises a non-None record. The + // `IdorBoundaryCrossed` predicate fires when `caller_id != owner_id`. + if spec.expected_cap == crate::labels::Cap::UNAUTHORIZED_ID { + return Ok(emit_unauthorized_id_harness(spec)); + } + + // Phase 11 (Track J.9): DATA_EXFIL harness. Monkey-patches + // `urllib.request.urlopen` (and `urlopen` re-exported from + // `urllib.request` modules) so the outbound URL's host is recorded + // via a `ProbeKind::OutboundNetwork { host }` probe before the + // request is short-circuited (no real network egress). The + // `OutboundHostNotIn` predicate fires when the captured host is + // outside the configured loopback allowlist. + if spec.expected_cap == crate::labels::Cap::DATA_EXFIL { + return Ok(emit_data_exfil_harness(spec)); + } + // Phase 19 (Track M.1): ClassMethod short-circuit. When the spec's // entry_kind is the data-bearing `ClassMethod { class, method }` // variant the harness instantiates the class via its default @@ -2789,6 +2809,234 @@ def _nyx_run(): print("__NYX_SINK_HIT__", flush=True) +if __name__ == "__main__": + _nyx_run() +"# + ); + HarnessSource { + source: body, + filename: "harness.py".to_owned(), + command: vec!["python3".to_owned(), "harness.py".to_owned()], + extra_files: Vec::new(), + entry_subpath: None, + } +} + +/// UNAUTHORIZED_ID IDOR harness for Python. +/// +/// Reads `NYX_PAYLOAD` as the requested `owner_id`, imports the fixture +/// module, and invokes the named entry function with it. When the +/// fixture returns a non-`None` record (i.e. the data store materialised +/// the row without an authorization check) the harness emits a +/// [`crate::dynamic::probe::ProbeKind::IdorAccess`] probe carrying the +/// hard-coded `caller_id = "alice"` and the payload as `owner_id`. The +/// [`crate::dynamic::oracle::ProbePredicate::IdorBoundaryCrossed`] +/// predicate fires whenever `caller_id != owner_id`, so a vuln payload +/// (`bob`) materialises the probe and a benign payload (`alice`) clears +/// the predicate even though both fixtures return a record. +pub fn emit_unauthorized_id_harness(spec: &HarnessSpec) -> HarnessSource { + let probe = probe_shim(); + let module_name = derive_module_name(&spec.entry_file); + let entry_name = if spec.entry_name.is_empty() { + "run".to_owned() + } else { + spec.entry_name.clone() + }; + let body = format!( + r#"#!/usr/bin/env python3 +"""Nyx dynamic harness — UNAUTHORIZED_ID IDOR boundary (Phase 11 / Track J.9).""" +import importlib +import json +import os +import sys +import time + +{probe} + +_NYX_CALLER_ID = "alice" + + +def _nyx_idor_probe(caller_id, owner_id): + rec = {{ + "sink_callee": "__nyx_idor_lookup", + "args": [ + {{"kind": "String", "value": str(caller_id)}}, + {{"kind": "String", "value": str(owner_id)}}, + ], + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {{ + "kind": "IdorAccess", + "caller_id": str(caller_id), + "owner_id": str(owner_id), + }}, + "witness": __nyx_witness("__nyx_idor_lookup", [str(caller_id), str(owner_id)]), + }} + __nyx_emit(rec) + + +def _nyx_idor_via_fixture(payload): + sys.path.insert(0, ".") + try: + mod = importlib.import_module("{module_name}") + except Exception: + return None + fn = getattr(mod, "{entry_name}", None) + if fn is None: + return None + try: + return fn(payload) + except Exception: + return None + + +def _nyx_run(): + payload = os.environ.get("NYX_PAYLOAD", "") + record = _nyx_idor_via_fixture(payload) + if record is not None: + _nyx_idor_probe(_NYX_CALLER_ID, payload) + print("__NYX_SINK_HIT__", flush=True) + sys.stdout.write(json.dumps({{"materialised": record is not None}}) + "\n") + sys.stdout.flush() + + +if __name__ == "__main__": + _nyx_run() +"# + ); + HarnessSource { + source: body, + filename: "harness.py".to_owned(), + command: vec!["python3".to_owned(), "harness.py".to_owned()], + extra_files: Vec::new(), + entry_subpath: None, + } +} + +/// DATA_EXFIL outbound-network harness for Python. +/// +/// Monkey-patches `urllib.request.urlopen` so any outbound HTTP request +/// the fixture initiates is intercepted before the wire I/O: the URL's +/// host is parsed via `urllib.parse.urlparse`, a +/// [`crate::dynamic::probe::ProbeKind::OutboundNetwork`] probe is +/// emitted, and the call returns a benign in-memory stand-in so the +/// fixture's caller never blocks on the network. The +/// [`crate::dynamic::oracle::ProbePredicate::OutboundHostNotIn`] +/// predicate fires when the captured host falls outside the loopback +/// allowlist, so the `attacker.test` vuln payload materialises a probe +/// the predicate matches while the `127.0.0.1` benign control stays +/// clear even though both fixtures call through the same intercepted +/// API. +pub fn emit_data_exfil_harness(spec: &HarnessSpec) -> HarnessSource { + let probe = probe_shim(); + let module_name = derive_module_name(&spec.entry_file); + let entry_name = if spec.entry_name.is_empty() { + "run".to_owned() + } else { + spec.entry_name.clone() + }; + let body = format!( + r#"#!/usr/bin/env python3 +"""Nyx dynamic harness — DATA_EXFIL outbound-host (Phase 11 / Track J.9).""" +import importlib +import io +import json +import os +import sys +import time +import urllib.parse +import urllib.request + +{probe} + + +def _nyx_outbound_probe(host): + rec = {{ + "sink_callee": "__nyx_mock_http", + "args": [{{"kind": "String", "value": str(host)}}], + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {{"kind": "OutboundNetwork", "host": str(host)}}, + "witness": __nyx_witness("__nyx_mock_http", [str(host)]), + }} + __nyx_emit(rec) + + +def _nyx_extract_host(target): + # Accepts either a urllib.request.Request instance or a raw URL str. + raw = None + if hasattr(target, "full_url"): + raw = target.full_url + elif hasattr(target, "get_full_url"): + try: + raw = target.get_full_url() + except Exception: + raw = None + if raw is None: + raw = target + if isinstance(raw, (bytes, bytearray)): + try: + raw = raw.decode("utf-8", "replace") + except Exception: + raw = "" + if not isinstance(raw, str): + raw = str(raw) + try: + parsed = urllib.parse.urlparse(raw) + except Exception: + return "" + host = parsed.hostname + return host if host is not None else "" + + +class _NyxFakeResponse(io.BytesIO): + def __enter__(self): + return self + + def __exit__(self, *exc): + self.close() + return False + + def getcode(self): + return 200 + + def info(self): + return {{}} + + +def _nyx_urlopen(url, data=None, timeout=None, *args, **kwargs): + host = _nyx_extract_host(url) + _nyx_outbound_probe(host) + return _NyxFakeResponse(b"") + + +urllib.request.urlopen = _nyx_urlopen + + +def _nyx_data_exfil_via_fixture(payload): + sys.path.insert(0, ".") + try: + mod = importlib.import_module("{module_name}") + except Exception: + return False + fn = getattr(mod, "{entry_name}", None) + if fn is None: + return False + try: + fn(payload) + except Exception: + return True + return True + + +def _nyx_run(): + payload = os.environ.get("NYX_PAYLOAD", "") + _nyx_data_exfil_via_fixture(payload) + print("__NYX_SINK_HIT__", flush=True) + sys.stdout.write(json.dumps({{"payload": payload}}) + "\n") + sys.stdout.flush() + + if __name__ == "__main__": _nyx_run() "# @@ -4395,4 +4643,173 @@ mod tests { let h = emit_json_parse_harness(&make_json_parse_spec("/abs/path/benign.py", "run")); assert!(h.source.contains("importlib.import_module(\"benign\")")); } + + fn make_unauthorized_id_spec(entry_file: &str, entry_name: &str) -> HarnessSpec { + let mut spec = make_spec(PayloadSlot::Param(0)); + spec.expected_cap = Cap::UNAUTHORIZED_ID; + spec.entry_file = entry_file.to_owned(); + spec.entry_name = entry_name.to_owned(); + spec + } + + #[test] + fn emit_dispatches_to_unauthorized_id_harness_when_cap_is_unauthorized_id() { + let h = emit(&make_unauthorized_id_spec( + "tests/dynamic_fixtures/unauthorized_id/python/vuln.py", + "run", + )) + .unwrap(); + assert!( + h.source.contains("_nyx_idor_probe"), + "dispatcher must short-circuit Cap::UNAUTHORIZED_ID into emit_unauthorized_id_harness: {}", + h.source + ); + assert!( + h.source.contains("\"kind\": \"IdorAccess\""), + "UNAUTHORIZED_ID harness must emit ProbeKind::IdorAccess records", + ); + } + + #[test] + fn emit_unauthorized_id_harness_pins_caller_id() { + let h = emit_unauthorized_id_harness(&make_unauthorized_id_spec( + "tests/dynamic_fixtures/unauthorized_id/python/vuln.py", + "run", + )); + assert!( + h.source.contains("_NYX_CALLER_ID = \"alice\""), + "harness must hard-code caller_id=alice so the predicate fires only when payload ≠ alice", + ); + assert!( + h.source + .contains("_nyx_idor_probe(_NYX_CALLER_ID, payload)"), + "harness must emit the IDOR probe with the hard-coded caller and the payload owner_id", + ); + } + + #[test] + fn emit_unauthorized_id_harness_skips_probe_when_record_is_none() { + let h = emit_unauthorized_id_harness(&make_unauthorized_id_spec( + "tests/dynamic_fixtures/unauthorized_id/python/benign.py", + "run", + )); + assert!( + h.source.contains("if record is not None:"), + "harness must only emit the probe when the fixture materialised a record so the benign fixture (which returns None on boundary cross) does not flip the predicate", + ); + } + + #[test] + fn emit_unauthorized_id_harness_routes_through_fixture_import() { + let h = emit_unauthorized_id_harness(&make_unauthorized_id_spec( + "tests/dynamic_fixtures/unauthorized_id/python/vuln.py", + "run", + )); + assert!( + h.source + .contains("def _nyx_idor_via_fixture(payload):"), + "Python UNAUTHORIZED_ID harness must define the fixture-routing helper", + ); + assert!(h.source.contains("importlib.import_module(\"vuln\")")); + assert!(h.source.contains("getattr(mod, \"run\", None)")); + assert_eq!(h.filename, "harness.py"); + assert!(h.extra_files.is_empty()); + } + + #[test] + fn emit_unauthorized_id_harness_derives_module_name_from_entry_file() { + let h = emit_unauthorized_id_harness(&make_unauthorized_id_spec( + "/abs/path/benign.py", + "run", + )); + assert!(h.source.contains("importlib.import_module(\"benign\")")); + } + + fn make_data_exfil_spec(entry_file: &str, entry_name: &str) -> HarnessSpec { + let mut spec = make_spec(PayloadSlot::Param(0)); + spec.expected_cap = Cap::DATA_EXFIL; + spec.entry_file = entry_file.to_owned(); + spec.entry_name = entry_name.to_owned(); + spec + } + + #[test] + fn emit_dispatches_to_data_exfil_harness_when_cap_is_data_exfil() { + let h = emit(&make_data_exfil_spec( + "tests/dynamic_fixtures/data_exfil/python/vuln.py", + "run", + )) + .unwrap(); + assert!( + h.source.contains("urllib.request.urlopen = _nyx_urlopen"), + "dispatcher must short-circuit Cap::DATA_EXFIL into emit_data_exfil_harness: {}", + h.source + ); + assert!( + h.source.contains("\"kind\": \"OutboundNetwork\""), + "DATA_EXFIL harness must emit ProbeKind::OutboundNetwork records", + ); + } + + #[test] + fn emit_data_exfil_harness_monkey_patches_urlopen() { + let h = emit_data_exfil_harness(&make_data_exfil_spec( + "tests/dynamic_fixtures/data_exfil/python/vuln.py", + "run", + )); + assert!(h.source.contains("urllib.request.urlopen = _nyx_urlopen")); + assert!(h.source.contains("def _nyx_urlopen(url, data=None, timeout=None, *args, **kwargs):")); + assert!( + h.source.contains("class _NyxFakeResponse(io.BytesIO):"), + "harness must return a fake response so the fixture does not block on real network egress", + ); + } + + #[test] + fn emit_data_exfil_harness_parses_host_via_urlparse() { + let h = emit_data_exfil_harness(&make_data_exfil_spec( + "tests/dynamic_fixtures/data_exfil/python/vuln.py", + "run", + )); + assert!(h.source.contains("urllib.parse.urlparse(raw)")); + assert!(h.source.contains("host = parsed.hostname")); + } + + #[test] + fn emit_data_exfil_harness_handles_request_instance_via_full_url() { + let h = emit_data_exfil_harness(&make_data_exfil_spec( + "tests/dynamic_fixtures/data_exfil/python/vuln.py", + "run", + )); + assert!( + h.source.contains("hasattr(target, \"full_url\")"), + "harness must accept a urllib.request.Request instance too (not only bare URL strings)", + ); + } + + #[test] + fn emit_data_exfil_harness_routes_through_fixture_import() { + let h = emit_data_exfil_harness(&make_data_exfil_spec( + "tests/dynamic_fixtures/data_exfil/python/vuln.py", + "run", + )); + assert!( + h.source + .contains("def _nyx_data_exfil_via_fixture(payload):"), + "Python DATA_EXFIL harness must define the fixture-routing helper", + ); + assert!(h.source.contains("importlib.import_module(\"vuln\")")); + assert!(h.source.contains("getattr(mod, \"run\", None)")); + assert_eq!(h.filename, "harness.py"); + assert!(h.extra_files.is_empty()); + } + + #[test] + fn emit_data_exfil_harness_derives_module_name_from_entry_file() { + let h = emit_data_exfil_harness(&make_data_exfil_spec( + "/abs/path/benign.py", + "run", + )); + assert!(h.source.contains("importlib.import_module(\"benign\")")); + } } diff --git a/tests/data_exfil_corpus.rs b/tests/data_exfil_corpus.rs index cd180d10..24769b34 100644 --- a/tests/data_exfil_corpus.rs +++ b/tests/data_exfil_corpus.rs @@ -13,6 +13,8 @@ #![cfg(feature = "dynamic")] +mod common; + use nyx_scanner::dynamic::corpus::{payloads_for_lang, resolve_benign_control_lang}; use nyx_scanner::dynamic::oracle::{Oracle, ProbePredicate, oracle_fired}; use nyx_scanner::dynamic::probe::{ProbeKind, ProbeWitness, SinkProbe}; @@ -110,3 +112,145 @@ fn outbound_predicate_fires_off_allowlist() { )); assert!(!oracle_fired(&oracle, &outcome(), &[])); } + +/// Drives the per-language DATA_EXFIL fixtures through `run_spec` and +/// asserts the vuln payload Confirms while the benign control does not. +/// Both fixtures share a single entry function (`run`) and the harness +/// monkey-patches `urllib.request.urlopen` so no real network egress +/// happens — the probe captures the parsed host before the request is +/// short-circuited. +mod e2e_data_exfil { + use crate::common::fixture_harness::FIXTURE_LOCK; + use nyx_scanner::dynamic::runner::{RunError, RunOutcome, run_spec}; + use nyx_scanner::dynamic::sandbox::{SandboxBackend, SandboxOptions}; + use nyx_scanner::dynamic::spec::{ + EntryKind, HarnessSpec, PayloadSlot, SpecDerivationStrategy, default_toolchain_id, + }; + use nyx_scanner::evidence::DifferentialVerdict; + use nyx_scanner::labels::Cap; + use nyx_scanner::symbol::Lang; + use std::path::PathBuf; + use std::process::Command; + use tempfile::TempDir; + + fn command_available(bin: &str) -> bool { + Command::new(bin) + .arg("--version") + .output() + .map(|o| o.status.success()) + .unwrap_or(false) + } + + fn build_spec(lang: Lang, fixture: &str, entry_name: &str) -> (HarnessSpec, TempDir) { + let fixture_src = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests/dynamic_fixtures/data_exfil") + .join(match lang { + Lang::Python => "python", + _ => unreachable!("DATA_EXFIL e2e currently covers Python only"), + }) + .join(fixture); + let tmp = TempDir::new().expect("create tempdir"); + let dst = tmp.path().join(fixture); + std::fs::copy(&fixture_src, &dst).expect("copy fixture into tempdir"); + + let entry_file = dst.to_string_lossy().into_owned(); + let mut digest = blake3::Hasher::new(); + digest.update(b"e2e-data-exfil|"); + digest.update(fixture.as_bytes()); + let spec_hash = format!("{:016x}", { + let bytes = digest.finalize(); + u64::from_le_bytes(bytes.as_bytes()[..8].try_into().unwrap()) + }); + + let spec = HarnessSpec { + finding_id: spec_hash.clone(), + entry_file: entry_file.clone(), + entry_name: entry_name.to_owned(), + entry_kind: EntryKind::Function, + lang, + toolchain_id: default_toolchain_id(lang).into(), + payload_slot: PayloadSlot::Param(0), + expected_cap: Cap::DATA_EXFIL, + constraint_hints: vec![], + sink_file: entry_file, + sink_line: 1, + spec_hash: spec_hash.clone(), + derivation: SpecDerivationStrategy::FromFlowSteps, + stubs_required: vec![], + framework: None, + java_toolchain: nyx_scanner::dynamic::spec::JavaToolchain::default(), + }; + + (spec, tmp) + } + + fn run(lang: Lang, fixture: &str, entry_name: &str) -> Option { + let required = match lang { + Lang::Python => "python3", + _ => unreachable!("DATA_EXFIL e2e currently covers Python only"), + }; + if !command_available(required) { + eprintln!("SKIP {lang:?} {fixture}: missing toolchain {required}"); + return None; + } + let _guard = FIXTURE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); + let (spec, _tmp) = build_spec(lang, fixture, entry_name); + let opts = SandboxOptions { + backend: SandboxBackend::Process, + ..SandboxOptions::default() + }; + match run_spec(&spec, &opts) { + Ok(outcome) => Some(outcome), + Err(RunError::BuildFailed { stderr, attempts }) => { + eprintln!( + "SKIP {lang:?} {fixture}: harness build failed after {attempts} attempts: {stderr}", + ); + None + } + Err(e) => panic!("run_spec({lang:?} {fixture}) errored: {e:?}"), + } + } + + /// Pointed at the vuln fixture: + /// + /// * `attacker.test` → fixture sends to `attacker.test` → probe captures + /// `host = "attacker.test"` → `OutboundHostNotIn` fires. + /// * `127.0.0.1` → fixture sends to loopback → probe captures + /// `host = "127.0.0.1"` → predicate clears (loopback in allowlist). + /// + /// Differential lands at `Confirmed`. + #[test] + fn python_vuln_confirms_via_run_spec() { + let Some(outcome) = run(Lang::Python, "vuln.py", "run") else { + return; + }; + assert!( + outcome.triggered_by.is_some(), + "Python DATA_EXFIL vuln must confirm via run_spec; got {outcome:?}", + ); + let diff = outcome + .differential + .as_ref() + .expect("confirmed run must carry a DifferentialOutcome"); + assert_eq!(diff.verdict, DifferentialVerdict::Confirmed); + } + + /// Pointed at the benign fixture (which rejects non-allowlist hosts + /// before calling urlopen): + /// + /// * `attacker.test` → fixture rejects → no urlopen → no probe. + /// * `127.0.0.1` → fixture sends to loopback → probe(host = "127.0.0.1") + /// → predicate clears. + /// + /// Neither payload fires; differential lands at `NotConfirmed`. + #[test] + fn python_benign_does_not_confirm_via_run_spec() { + let Some(outcome) = run(Lang::Python, "benign.py", "run") else { + return; + }; + assert!( + outcome.triggered_by.is_none(), + "Python DATA_EXFIL benign control must not confirm via run_spec; got {outcome:?}", + ); + } +} diff --git a/tests/unauthorized_id_corpus.rs b/tests/unauthorized_id_corpus.rs index 8a1b040a..efbe1ea0 100644 --- a/tests/unauthorized_id_corpus.rs +++ b/tests/unauthorized_id_corpus.rs @@ -11,6 +11,8 @@ #![cfg(feature = "dynamic")] +mod common; + use nyx_scanner::dynamic::corpus::{payloads_for_lang, resolve_benign_control_lang}; use nyx_scanner::dynamic::oracle::{Oracle, ProbePredicate, oracle_fired}; use nyx_scanner::dynamic::probe::{ProbeKind, ProbeWitness, SinkProbe}; @@ -102,3 +104,146 @@ fn idor_predicate_fires_on_boundary_crossing() { )); assert!(!oracle_fired(&oracle, &outcome(), &[])); } + +/// Drives the per-language UNAUTHORIZED_ID fixtures through `run_spec` +/// and asserts the vuln payload Confirms while the benign control does +/// not. Each fixture pair shares a single entry function (`run`); the +/// harness emitter resolves the payload-vs-record boundary via the +/// hard-coded `caller_id = "alice"` it embeds in the probe shim. +mod e2e_unauthorized_id { + use crate::common::fixture_harness::FIXTURE_LOCK; + use nyx_scanner::dynamic::runner::{RunError, RunOutcome, run_spec}; + use nyx_scanner::dynamic::sandbox::{SandboxBackend, SandboxOptions}; + use nyx_scanner::dynamic::spec::{ + EntryKind, HarnessSpec, PayloadSlot, SpecDerivationStrategy, default_toolchain_id, + }; + use nyx_scanner::evidence::DifferentialVerdict; + use nyx_scanner::labels::Cap; + use nyx_scanner::symbol::Lang; + use std::path::PathBuf; + use std::process::Command; + use tempfile::TempDir; + + fn command_available(bin: &str) -> bool { + Command::new(bin) + .arg("--version") + .output() + .map(|o| o.status.success()) + .unwrap_or(false) + } + + fn build_spec(lang: Lang, fixture: &str, entry_name: &str) -> (HarnessSpec, TempDir) { + let fixture_src = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests/dynamic_fixtures/unauthorized_id") + .join(match lang { + Lang::Python => "python", + _ => unreachable!("UNAUTHORIZED_ID e2e currently covers Python only"), + }) + .join(fixture); + let tmp = TempDir::new().expect("create tempdir"); + let dst = tmp.path().join(fixture); + std::fs::copy(&fixture_src, &dst).expect("copy fixture into tempdir"); + + let entry_file = dst.to_string_lossy().into_owned(); + let mut digest = blake3::Hasher::new(); + digest.update(b"e2e-unauthorized-id|"); + digest.update(fixture.as_bytes()); + let spec_hash = format!("{:016x}", { + let bytes = digest.finalize(); + u64::from_le_bytes(bytes.as_bytes()[..8].try_into().unwrap()) + }); + + let spec = HarnessSpec { + finding_id: spec_hash.clone(), + entry_file: entry_file.clone(), + entry_name: entry_name.to_owned(), + entry_kind: EntryKind::Function, + lang, + toolchain_id: default_toolchain_id(lang).into(), + payload_slot: PayloadSlot::Param(0), + expected_cap: Cap::UNAUTHORIZED_ID, + constraint_hints: vec![], + sink_file: entry_file, + sink_line: 1, + spec_hash: spec_hash.clone(), + derivation: SpecDerivationStrategy::FromFlowSteps, + stubs_required: vec![], + framework: None, + java_toolchain: nyx_scanner::dynamic::spec::JavaToolchain::default(), + }; + + (spec, tmp) + } + + fn run(lang: Lang, fixture: &str, entry_name: &str) -> Option { + let required = match lang { + Lang::Python => "python3", + _ => unreachable!("UNAUTHORIZED_ID e2e currently covers Python only"), + }; + if !command_available(required) { + eprintln!("SKIP {lang:?} {fixture}: missing toolchain {required}"); + return None; + } + let _guard = FIXTURE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); + let (spec, _tmp) = build_spec(lang, fixture, entry_name); + let opts = SandboxOptions { + backend: SandboxBackend::Process, + ..SandboxOptions::default() + }; + match run_spec(&spec, &opts) { + Ok(outcome) => Some(outcome), + Err(RunError::BuildFailed { stderr, attempts }) => { + eprintln!( + "SKIP {lang:?} {fixture}: harness build failed after {attempts} attempts: {stderr}", + ); + None + } + Err(e) => panic!("run_spec({lang:?} {fixture}) errored: {e:?}"), + } + } + + /// The runner draws the curated payload pair (vuln "bob" + benign "alice") + /// from `payloads_for_lang(Cap::UNAUTHORIZED_ID, Lang::Python)`. Pointed at + /// the vuln fixture: + /// + /// * `bob` → fixture returns bob's record → probe(caller=alice, owner=bob) + /// → `IdorBoundaryCrossed` fires. + /// * `alice` → fixture returns alice's record → probe(caller=alice, + /// owner=alice) → predicate clears. + /// + /// The vuln-vs-benign differential lands at `Confirmed`. + #[test] + fn python_vuln_confirms_via_run_spec() { + let Some(outcome) = run(Lang::Python, "vuln.py", "run") else { + return; + }; + assert!( + outcome.triggered_by.is_some(), + "Python UNAUTHORIZED_ID vuln must confirm via run_spec; got {outcome:?}", + ); + let diff = outcome + .differential + .as_ref() + .expect("confirmed run must carry a DifferentialOutcome"); + assert_eq!(diff.verdict, DifferentialVerdict::Confirmed); + } + + /// Pointed at the benign fixture: + /// + /// * `bob` → fixture rejects (returns None) → no probe. + /// * `alice` → fixture returns alice's record → probe(alice, alice) → + /// predicate clears. + /// + /// Neither payload fires the predicate; the differential lands at + /// `NotConfirmed`. + #[test] + fn python_benign_does_not_confirm_via_run_spec() { + let Some(outcome) = run(Lang::Python, "benign.py", "run") else { + return; + }; + assert!( + outcome.triggered_by.is_none(), + "Python UNAUTHORIZED_ID benign control must not confirm via run_spec; got {outcome:?}", + ); + } +}