[pitboss/grind] deferred session-0005 (20260522T163126Z-7d60)

This commit is contained in:
pitboss 2026-05-22 13:39:14 -05:00
parent 0d4ab22c4c
commit 94a3d12a4e
3 changed files with 706 additions and 0 deletions

View file

@ -705,6 +705,26 @@ pub fn emit(spec: &HarnessSpec) -> Result<HarnessSource, UnsupportedReason> {
return Ok(emit_json_parse_harness(spec));
}
// Phase 11 (Track J.9): UNAUTHORIZED_ID harness. Imports the
// fixture, invokes the entry with the payload as the requested
// owner_id, and emits a `ProbeKind::IdorAccess { caller_id, owner_id }`
// whenever the fixture materialises a non-None record. The
// `IdorBoundaryCrossed` predicate fires when `caller_id != owner_id`.
if spec.expected_cap == crate::labels::Cap::UNAUTHORIZED_ID {
return Ok(emit_unauthorized_id_harness(spec));
}
// Phase 11 (Track J.9): DATA_EXFIL harness. Monkey-patches
// `urllib.request.urlopen` (and `urlopen` re-exported from
// `urllib.request` modules) so the outbound URL's host is recorded
// via a `ProbeKind::OutboundNetwork { host }` probe before the
// request is short-circuited (no real network egress). The
// `OutboundHostNotIn` predicate fires when the captured host is
// outside the configured loopback allowlist.
if spec.expected_cap == crate::labels::Cap::DATA_EXFIL {
return Ok(emit_data_exfil_harness(spec));
}
// Phase 19 (Track M.1): ClassMethod short-circuit. When the spec's
// entry_kind is the data-bearing `ClassMethod { class, method }`
// variant the harness instantiates the class via its default
@ -2789,6 +2809,234 @@ def _nyx_run():
print("__NYX_SINK_HIT__", flush=True)
if __name__ == "__main__":
_nyx_run()
"#
);
HarnessSource {
source: body,
filename: "harness.py".to_owned(),
command: vec!["python3".to_owned(), "harness.py".to_owned()],
extra_files: Vec::new(),
entry_subpath: None,
}
}
/// UNAUTHORIZED_ID IDOR harness for Python.
///
/// Reads `NYX_PAYLOAD` as the requested `owner_id`, imports the fixture
/// module, and invokes the named entry function with it. When the
/// fixture returns a non-`None` record (i.e. the data store materialised
/// the row without an authorization check) the harness emits a
/// [`crate::dynamic::probe::ProbeKind::IdorAccess`] probe carrying the
/// hard-coded `caller_id = "alice"` and the payload as `owner_id`. The
/// [`crate::dynamic::oracle::ProbePredicate::IdorBoundaryCrossed`]
/// predicate fires whenever `caller_id != owner_id`, so a vuln payload
/// (`bob`) materialises the probe and a benign payload (`alice`) clears
/// the predicate even though both fixtures return a record.
pub fn emit_unauthorized_id_harness(spec: &HarnessSpec) -> HarnessSource {
let probe = probe_shim();
let module_name = derive_module_name(&spec.entry_file);
let entry_name = if spec.entry_name.is_empty() {
"run".to_owned()
} else {
spec.entry_name.clone()
};
let body = format!(
r#"#!/usr/bin/env python3
"""Nyx dynamic harness — UNAUTHORIZED_ID IDOR boundary (Phase 11 / Track J.9)."""
import importlib
import json
import os
import sys
import time
{probe}
_NYX_CALLER_ID = "alice"
def _nyx_idor_probe(caller_id, owner_id):
rec = {{
"sink_callee": "__nyx_idor_lookup",
"args": [
{{"kind": "String", "value": str(caller_id)}},
{{"kind": "String", "value": str(owner_id)}},
],
"captured_at_ns": time.time_ns(),
"payload_id": os.environ.get("NYX_PAYLOAD_ID", ""),
"kind": {{
"kind": "IdorAccess",
"caller_id": str(caller_id),
"owner_id": str(owner_id),
}},
"witness": __nyx_witness("__nyx_idor_lookup", [str(caller_id), str(owner_id)]),
}}
__nyx_emit(rec)
def _nyx_idor_via_fixture(payload):
sys.path.insert(0, ".")
try:
mod = importlib.import_module("{module_name}")
except Exception:
return None
fn = getattr(mod, "{entry_name}", None)
if fn is None:
return None
try:
return fn(payload)
except Exception:
return None
def _nyx_run():
payload = os.environ.get("NYX_PAYLOAD", "")
record = _nyx_idor_via_fixture(payload)
if record is not None:
_nyx_idor_probe(_NYX_CALLER_ID, payload)
print("__NYX_SINK_HIT__", flush=True)
sys.stdout.write(json.dumps({{"materialised": record is not None}}) + "\n")
sys.stdout.flush()
if __name__ == "__main__":
_nyx_run()
"#
);
HarnessSource {
source: body,
filename: "harness.py".to_owned(),
command: vec!["python3".to_owned(), "harness.py".to_owned()],
extra_files: Vec::new(),
entry_subpath: None,
}
}
/// DATA_EXFIL outbound-network harness for Python.
///
/// Monkey-patches `urllib.request.urlopen` so any outbound HTTP request
/// the fixture initiates is intercepted before the wire I/O: the URL's
/// host is parsed via `urllib.parse.urlparse`, a
/// [`crate::dynamic::probe::ProbeKind::OutboundNetwork`] probe is
/// emitted, and the call returns a benign in-memory stand-in so the
/// fixture's caller never blocks on the network. The
/// [`crate::dynamic::oracle::ProbePredicate::OutboundHostNotIn`]
/// predicate fires when the captured host falls outside the loopback
/// allowlist, so the `attacker.test` vuln payload materialises a probe
/// the predicate matches while the `127.0.0.1` benign control stays
/// clear even though both fixtures call through the same intercepted
/// API.
pub fn emit_data_exfil_harness(spec: &HarnessSpec) -> HarnessSource {
let probe = probe_shim();
let module_name = derive_module_name(&spec.entry_file);
let entry_name = if spec.entry_name.is_empty() {
"run".to_owned()
} else {
spec.entry_name.clone()
};
let body = format!(
r#"#!/usr/bin/env python3
"""Nyx dynamic harness — DATA_EXFIL outbound-host (Phase 11 / Track J.9)."""
import importlib
import io
import json
import os
import sys
import time
import urllib.parse
import urllib.request
{probe}
def _nyx_outbound_probe(host):
rec = {{
"sink_callee": "__nyx_mock_http",
"args": [{{"kind": "String", "value": str(host)}}],
"captured_at_ns": time.time_ns(),
"payload_id": os.environ.get("NYX_PAYLOAD_ID", ""),
"kind": {{"kind": "OutboundNetwork", "host": str(host)}},
"witness": __nyx_witness("__nyx_mock_http", [str(host)]),
}}
__nyx_emit(rec)
def _nyx_extract_host(target):
# Accepts either a urllib.request.Request instance or a raw URL str.
raw = None
if hasattr(target, "full_url"):
raw = target.full_url
elif hasattr(target, "get_full_url"):
try:
raw = target.get_full_url()
except Exception:
raw = None
if raw is None:
raw = target
if isinstance(raw, (bytes, bytearray)):
try:
raw = raw.decode("utf-8", "replace")
except Exception:
raw = ""
if not isinstance(raw, str):
raw = str(raw)
try:
parsed = urllib.parse.urlparse(raw)
except Exception:
return ""
host = parsed.hostname
return host if host is not None else ""
class _NyxFakeResponse(io.BytesIO):
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
return False
def getcode(self):
return 200
def info(self):
return {{}}
def _nyx_urlopen(url, data=None, timeout=None, *args, **kwargs):
host = _nyx_extract_host(url)
_nyx_outbound_probe(host)
return _NyxFakeResponse(b"")
urllib.request.urlopen = _nyx_urlopen
def _nyx_data_exfil_via_fixture(payload):
sys.path.insert(0, ".")
try:
mod = importlib.import_module("{module_name}")
except Exception:
return False
fn = getattr(mod, "{entry_name}", None)
if fn is None:
return False
try:
fn(payload)
except Exception:
return True
return True
def _nyx_run():
payload = os.environ.get("NYX_PAYLOAD", "")
_nyx_data_exfil_via_fixture(payload)
print("__NYX_SINK_HIT__", flush=True)
sys.stdout.write(json.dumps({{"payload": payload}}) + "\n")
sys.stdout.flush()
if __name__ == "__main__":
_nyx_run()
"#
@ -4395,4 +4643,173 @@ mod tests {
let h = emit_json_parse_harness(&make_json_parse_spec("/abs/path/benign.py", "run"));
assert!(h.source.contains("importlib.import_module(\"benign\")"));
}
fn make_unauthorized_id_spec(entry_file: &str, entry_name: &str) -> HarnessSpec {
let mut spec = make_spec(PayloadSlot::Param(0));
spec.expected_cap = Cap::UNAUTHORIZED_ID;
spec.entry_file = entry_file.to_owned();
spec.entry_name = entry_name.to_owned();
spec
}
#[test]
fn emit_dispatches_to_unauthorized_id_harness_when_cap_is_unauthorized_id() {
let h = emit(&make_unauthorized_id_spec(
"tests/dynamic_fixtures/unauthorized_id/python/vuln.py",
"run",
))
.unwrap();
assert!(
h.source.contains("_nyx_idor_probe"),
"dispatcher must short-circuit Cap::UNAUTHORIZED_ID into emit_unauthorized_id_harness: {}",
h.source
);
assert!(
h.source.contains("\"kind\": \"IdorAccess\""),
"UNAUTHORIZED_ID harness must emit ProbeKind::IdorAccess records",
);
}
#[test]
fn emit_unauthorized_id_harness_pins_caller_id() {
let h = emit_unauthorized_id_harness(&make_unauthorized_id_spec(
"tests/dynamic_fixtures/unauthorized_id/python/vuln.py",
"run",
));
assert!(
h.source.contains("_NYX_CALLER_ID = \"alice\""),
"harness must hard-code caller_id=alice so the predicate fires only when payload ≠ alice",
);
assert!(
h.source
.contains("_nyx_idor_probe(_NYX_CALLER_ID, payload)"),
"harness must emit the IDOR probe with the hard-coded caller and the payload owner_id",
);
}
#[test]
fn emit_unauthorized_id_harness_skips_probe_when_record_is_none() {
let h = emit_unauthorized_id_harness(&make_unauthorized_id_spec(
"tests/dynamic_fixtures/unauthorized_id/python/benign.py",
"run",
));
assert!(
h.source.contains("if record is not None:"),
"harness must only emit the probe when the fixture materialised a record so the benign fixture (which returns None on boundary cross) does not flip the predicate",
);
}
#[test]
fn emit_unauthorized_id_harness_routes_through_fixture_import() {
let h = emit_unauthorized_id_harness(&make_unauthorized_id_spec(
"tests/dynamic_fixtures/unauthorized_id/python/vuln.py",
"run",
));
assert!(
h.source
.contains("def _nyx_idor_via_fixture(payload):"),
"Python UNAUTHORIZED_ID harness must define the fixture-routing helper",
);
assert!(h.source.contains("importlib.import_module(\"vuln\")"));
assert!(h.source.contains("getattr(mod, \"run\", None)"));
assert_eq!(h.filename, "harness.py");
assert!(h.extra_files.is_empty());
}
#[test]
fn emit_unauthorized_id_harness_derives_module_name_from_entry_file() {
let h = emit_unauthorized_id_harness(&make_unauthorized_id_spec(
"/abs/path/benign.py",
"run",
));
assert!(h.source.contains("importlib.import_module(\"benign\")"));
}
fn make_data_exfil_spec(entry_file: &str, entry_name: &str) -> HarnessSpec {
let mut spec = make_spec(PayloadSlot::Param(0));
spec.expected_cap = Cap::DATA_EXFIL;
spec.entry_file = entry_file.to_owned();
spec.entry_name = entry_name.to_owned();
spec
}
#[test]
fn emit_dispatches_to_data_exfil_harness_when_cap_is_data_exfil() {
let h = emit(&make_data_exfil_spec(
"tests/dynamic_fixtures/data_exfil/python/vuln.py",
"run",
))
.unwrap();
assert!(
h.source.contains("urllib.request.urlopen = _nyx_urlopen"),
"dispatcher must short-circuit Cap::DATA_EXFIL into emit_data_exfil_harness: {}",
h.source
);
assert!(
h.source.contains("\"kind\": \"OutboundNetwork\""),
"DATA_EXFIL harness must emit ProbeKind::OutboundNetwork records",
);
}
#[test]
fn emit_data_exfil_harness_monkey_patches_urlopen() {
let h = emit_data_exfil_harness(&make_data_exfil_spec(
"tests/dynamic_fixtures/data_exfil/python/vuln.py",
"run",
));
assert!(h.source.contains("urllib.request.urlopen = _nyx_urlopen"));
assert!(h.source.contains("def _nyx_urlopen(url, data=None, timeout=None, *args, **kwargs):"));
assert!(
h.source.contains("class _NyxFakeResponse(io.BytesIO):"),
"harness must return a fake response so the fixture does not block on real network egress",
);
}
#[test]
fn emit_data_exfil_harness_parses_host_via_urlparse() {
let h = emit_data_exfil_harness(&make_data_exfil_spec(
"tests/dynamic_fixtures/data_exfil/python/vuln.py",
"run",
));
assert!(h.source.contains("urllib.parse.urlparse(raw)"));
assert!(h.source.contains("host = parsed.hostname"));
}
#[test]
fn emit_data_exfil_harness_handles_request_instance_via_full_url() {
let h = emit_data_exfil_harness(&make_data_exfil_spec(
"tests/dynamic_fixtures/data_exfil/python/vuln.py",
"run",
));
assert!(
h.source.contains("hasattr(target, \"full_url\")"),
"harness must accept a urllib.request.Request instance too (not only bare URL strings)",
);
}
#[test]
fn emit_data_exfil_harness_routes_through_fixture_import() {
let h = emit_data_exfil_harness(&make_data_exfil_spec(
"tests/dynamic_fixtures/data_exfil/python/vuln.py",
"run",
));
assert!(
h.source
.contains("def _nyx_data_exfil_via_fixture(payload):"),
"Python DATA_EXFIL harness must define the fixture-routing helper",
);
assert!(h.source.contains("importlib.import_module(\"vuln\")"));
assert!(h.source.contains("getattr(mod, \"run\", None)"));
assert_eq!(h.filename, "harness.py");
assert!(h.extra_files.is_empty());
}
#[test]
fn emit_data_exfil_harness_derives_module_name_from_entry_file() {
let h = emit_data_exfil_harness(&make_data_exfil_spec(
"/abs/path/benign.py",
"run",
));
assert!(h.source.contains("importlib.import_module(\"benign\")"));
}
}