[pitboss] phase 08: Track C.4 + C.5 — SinkCrash oracle + per-probe witness capture

This commit is contained in:
pitboss 2026-05-14 13:10:22 -05:00
parent 4eccbd48b4
commit 93eb98edda
21 changed files with 1988 additions and 115 deletions

View file

@ -51,12 +51,66 @@ impl LangEmitter for PythonEmitter {
/// configured a probe channel.
pub fn probe_shim() -> &'static str {
r#"
# __nyx_probe shim (Phase 06 Track C.1)
def __nyx_probe(sink_callee, *args):
import os, time, json
# __nyx_probe shim (Phase 06 Track C.1, Phase 08 Track C.4 + C.5)
# Deny-substring list mirrors crate::dynamic::policy::DENY_KEY_SUBSTRINGS; keep
# in sync when the host-side policy gains new entries.
_NYX_DENY_SUBSTRINGS = (
"TOKEN", "SECRET", "PASSWORD", "PASSWD", "API_KEY", "APIKEY",
"PRIVATE_KEY", "CREDENTIAL", "SESSION", "COOKIE", "AUTH", "BEARER",
"AWS_ACCESS", "AWS_SESSION", "GH_TOKEN", "GITHUB_TOKEN", "NPM_TOKEN",
"PYPI_TOKEN", "DOCKER_PASS",
)
_NYX_PAYLOAD_LIMIT = 16 * 1024
_NYX_REDACTED = "<redacted-by-nyx-policy>"
def __nyx_scrub_env():
import os
out = {}
for k, v in os.environ.items():
ku = str(k).upper()
if any(n in ku for n in _NYX_DENY_SUBSTRINGS):
out[k] = _NYX_REDACTED
else:
out[k] = v
return out
def __nyx_witness(sink_callee, args):
import os
payload = os.environ.get("NYX_PAYLOAD", "")
payload_bytes = payload.encode("utf-8", "replace") if isinstance(payload, str) else bytes(payload)
if len(payload_bytes) > _NYX_PAYLOAD_LIMIT:
payload_bytes = payload_bytes[:_NYX_PAYLOAD_LIMIT]
args_repr = []
for a in args:
if isinstance(a, (bytes, bytearray)):
args_repr.append("<bytes:%d>" % len(a))
else:
args_repr.append(str(a))
try:
cwd = os.getcwd()
except OSError:
cwd = ""
return {
"env_snapshot": __nyx_scrub_env(),
"cwd": cwd,
"payload_bytes": list(payload_bytes),
"callee": str(sink_callee),
"args_repr": args_repr,
}
def __nyx_emit(rec):
import os, json
p = os.environ.get("NYX_PROBE_PATH")
if not p:
return
try:
with open(p, "a") as _f:
_f.write(json.dumps(rec) + "\n")
except OSError:
pass
def __nyx_probe(sink_callee, *args):
import os, time
serialised = []
for a in args:
if isinstance(a, (bytes, bytearray)):
@ -72,12 +126,45 @@ def __nyx_probe(sink_callee, *args):
"args": serialised,
"captured_at_ns": time.time_ns(),
"payload_id": os.environ.get("NYX_PAYLOAD_ID", ""),
"kind": {"kind": "Normal"},
"witness": __nyx_witness(sink_callee, args),
}
try:
with open(p, "a") as _f:
_f.write(json.dumps(rec) + "\n")
except OSError:
pass
__nyx_emit(rec)
# Phase 08: sink-site signal handler. Call __nyx_install_crash_guard before
# invoking the instrumented sink so a SIGSEGV / SIGABRT / etc. is captured as
# a Crash probe (with witness) before the process aborts. The shim re-raises
# the signal on the default handler after writing so process-level outcome
# observers (exit_code) still see the death.
_NYX_SIGNAL_NAMES = {}
def __nyx_install_crash_guard(sink_callee):
import signal, os, time
catchable = []
for nm in ("SIGSEGV", "SIGABRT", "SIGBUS", "SIGFPE", "SIGILL"):
s = getattr(signal, nm, None)
if s is not None:
catchable.append((nm, s))
_NYX_SIGNAL_NAMES[s] = nm
def _handler(signum, frame):
nm = _NYX_SIGNAL_NAMES.get(signum, "SIG?")
rec = {
"sink_callee": str(sink_callee),
"args": [],
"captured_at_ns": time.time_ns(),
"payload_id": os.environ.get("NYX_PAYLOAD_ID", ""),
"kind": {"kind": "Crash", "signal": nm},
"witness": __nyx_witness(sink_callee, []),
}
__nyx_emit(rec)
# Reset to default and re-raise so the process actually dies.
signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum)
for _nm, s in catchable:
try:
signal.signal(s, _handler)
except (OSError, ValueError):
pass
"#
}