[pitboss] phase 03: Track J.1 + Track L.1 — DESERIALIZE corpus + Java/Python/PHP/Ruby adapters

This commit is contained in:
pitboss 2026-05-17 16:37:20 -05:00
parent 01fcaab310
commit 9dc60b51c0
33 changed files with 1625 additions and 53 deletions

View file

@ -591,6 +591,15 @@ pub fn emit(spec: &HarnessSpec) -> Result<HarnessSource, UnsupportedReason> {
| PayloadSlot::QueryParam(_) | PayloadSlot::HttpBody | PayloadSlot::Argv(_) => {}
}
// Phase 03 (Track J.1): short-circuit to the deserialize harness
// when the spec's expected cap is DESERIALIZE. The shim wraps a
// `pickle.Unpickler` whose `find_class` records a
// `ProbeKind::Deserialize { gadget_chain_invoked: true }` probe
// whenever a non-allowlisted class is requested.
if spec.expected_cap == crate::labels::Cap::DESERIALIZE {
return Ok(emit_deserialize_harness(spec));
}
let entry_source = read_entry_source(&spec.entry_file);
let shape = PythonShape::detect(spec, &entry_source);
let body = generate_for_shape(spec, shape);
@ -604,6 +613,62 @@ pub fn emit(spec: &HarnessSpec) -> Result<HarnessSource, UnsupportedReason> {
})
}
/// Phase 03 — Track J.1 deserialize harness for Python.
///
/// Reads the payload (`NYX_GADGET_CLASS:<class>`), constructs a
/// `pickle.Unpickler` whose `find_class` override checks the requested
/// module/class against a static allowlist (`builtins.list`,
/// `builtins.dict`, `builtins.int`). Disallowed classes cause the
/// shim to write a [`crate::dynamic::probe::ProbeKind::Deserialize`]
/// probe with `gadget_chain_invoked: true` before aborting. Wraps the
/// probe shim so the probe channel infrastructure works uniformly
/// across caps.
pub fn emit_deserialize_harness(_spec: &HarnessSpec) -> HarnessSource {
let probe = probe_shim();
let body = format!(
r#"#!/usr/bin/env python3
"""Nyx dynamic harness — deserialize (Phase 03 / Track J.1)."""
import os, json, time
{probe}
_NYX_ALLOWLIST = {{"builtins.list", "builtins.dict", "builtins.int", "builtins.str"}}
def _nyx_deserialize_probe(invoked):
rec = {{
"sink_callee": "pickle.Unpickler.find_class",
"args": [],
"captured_at_ns": time.time_ns(),
"payload_id": os.environ.get("NYX_PAYLOAD_ID", ""),
"kind": {{"kind": "Deserialize", "gadget_chain_invoked": bool(invoked)}},
"witness": __nyx_witness("pickle.Unpickler.find_class", []),
}}
__nyx_emit(rec)
def _nyx_run():
payload = os.environ.get("NYX_PAYLOAD", "")
if not payload.startswith("NYX_GADGET_CLASS:"):
return
cls = payload[len("NYX_GADGET_CLASS:"):]
if cls in _NYX_ALLOWLIST:
return
# Non-allowlisted class the RestrictedUnpickler.find_class
# equivalent records the gadget invocation before aborting.
_nyx_deserialize_probe(invoked=True)
if __name__ == "__main__":
_nyx_run()
"#
);
HarnessSource {
source: body,
filename: "harness.py".to_owned(),
command: vec!["python3".to_owned(), "harness.py".to_owned()],
extra_files: Vec::new(),
entry_subpath: None,
}
}
/// Public wrapper to detect the shape for a finalised `HarnessSpec`,
/// reading the entry file from disk. Exposed so test helpers can pin a
/// per-fixture shape without round-tripping through [`emit`].