[pitboss/grind] deferred session-0002 (20260520T233019Z-6958)

This commit is contained in:
pitboss 2026-05-20 20:26:13 -05:00
parent 3b49b4d4b5
commit a1a8a2140c
5 changed files with 335 additions and 36 deletions

View file

@ -1265,24 +1265,36 @@ fn indent_lines(src: &str, prefix: &str) -> String {
/// Phase 03 — Track J.1 deserialize harness for Python.
///
/// Reads the payload (`NYX_GADGET_CLASS:<class>`), constructs a
/// `pickle.Unpickler` whose `find_class` override checks the requested
/// module/class against a static allowlist (`builtins.list`,
/// `builtins.dict`, `builtins.int`). Disallowed classes cause the
/// shim to write a [`crate::dynamic::probe::ProbeKind::Deserialize`]
/// probe with `gadget_chain_invoked: true` before aborting. Wraps the
/// probe shim so the probe channel infrastructure works uniformly
/// Reads the payload (`NYX_GADGET_CLASS:<module>.<class>`), forges a
/// minimal real pickle stream containing a `GLOBAL` opcode for that
/// class, and runs it through a `pickle.Unpickler` subclass whose
/// `find_class` override enforces a static allowlist (`builtins.list`,
/// `builtins.dict`, `builtins.int`, `builtins.str`). When the
/// override sees a non-allowlisted class it writes a
/// [`crate::dynamic::probe::ProbeKind::Deserialize`] probe with
/// `gadget_chain_invoked: true` and raises `UnpicklingError` to abort
/// the load — matching real-world `RestrictedUnpickler` hardening
/// (e.g. RestrictedPython, MITRE-CWE-502 mitigation guidance). Wraps
/// the probe shim so the probe channel infrastructure works uniformly
/// across caps.
pub fn emit_deserialize_harness(_spec: &HarnessSpec) -> HarnessSource {
let probe = probe_shim();
let body = format!(
r#"#!/usr/bin/env python3
"""Nyx dynamic harness — deserialize (Phase 03 / Track J.1)."""
import os, json, time
import io
import os
import pickle
import time
{probe}
_NYX_ALLOWLIST = {{"builtins.list", "builtins.dict", "builtins.int", "builtins.str"}}
_NYX_ALLOWLIST = {{
("builtins", "list"),
("builtins", "dict"),
("builtins", "int"),
("builtins", "str"),
}}
def _nyx_deserialize_probe(invoked):
rec = {{
@ -1295,16 +1307,48 @@ def _nyx_deserialize_probe(invoked):
}}
__nyx_emit(rec)
class _NyxRestrictedUnpickler(pickle.Unpickler):
def find_class(self, module, name):
if (module, name) not in _NYX_ALLOWLIST:
_nyx_deserialize_probe(invoked=True)
raise pickle.UnpicklingError(
"Nyx restricted-unpickler blocked %s.%s" % (module, name)
)
return super().find_class(module, name)
def _nyx_forge_pickle_blob(qualified_class):
# GLOBAL (op `c`) is the protocol-0 instruction that drives
# `find_class(module, name)` lookup. Encoding: `c<module>\n<name>\n.`
# the trailing `.` is STOP. rpartition on the last `.` splits a
# qualified name like `nyx.gadget.RCE` into module=`nyx.gadget`,
# name=`RCE`; a bare name without a dot lands in `builtins`.
module, sep, name = qualified_class.rpartition(".")
if not sep:
module, name = "builtins", qualified_class
return (
b"c"
+ module.encode("utf-8")
+ b"\n"
+ name.encode("utf-8")
+ b"\n."
)
def _nyx_run():
payload = os.environ.get("NYX_PAYLOAD", "")
if not payload.startswith("NYX_GADGET_CLASS:"):
return
cls = payload[len("NYX_GADGET_CLASS:"):]
if cls in _NYX_ALLOWLIST:
return
# Non-allowlisted class the RestrictedUnpickler.find_class
# equivalent records the gadget invocation before aborting.
_nyx_deserialize_probe(invoked=True)
qualified = payload[len("NYX_GADGET_CLASS:"):]
blob = _nyx_forge_pickle_blob(qualified)
try:
_NyxRestrictedUnpickler(io.BytesIO(blob)).load()
except pickle.UnpicklingError:
# Restricted block probe already written above.
pass
except (AttributeError, ModuleNotFoundError, ImportError):
# Allow-listed class that doesn't actually resolve at runtime
# (e.g. a stale benign payload) still reaches find_class but
# cannot import; treat as a non-probe path.
pass
if __name__ == "__main__":
_nyx_run()