[pitboss] phase 03: M3 — Docker backend + sandbox-escape regression suite

This commit is contained in:
pitboss 2026-05-12 00:05:11 -04:00
parent 3a4f1b177b
commit a8b9dcd72b
36 changed files with 1778 additions and 27 deletions

View file

@ -0,0 +1,20 @@
"""Escape attempt: write to cgroup hierarchy to escape resource limits.
Requires CAP_SYS_ADMIN. Expected outcome: PermissionError.
"""
import os
import sys
CGROUP_PROCS = "/sys/fs/cgroup/cgroup.procs"
CGROUP_V1 = "/sys/fs/cgroup/memory/cgroup.procs"
target = CGROUP_PROCS if os.path.exists(CGROUP_PROCS) else CGROUP_V1
try:
with open(target, "w") as f:
f.write(str(os.getpid()))
print(f"NYX_ESCAPE_SUCCESS: wrote pid to {target}")
sys.exit(0)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,27 @@
"""Escape attempt: call chroot() to change root directory (classic jail escape).
Requires CAP_SYS_CHROOT. Expected outcome: EPERM (--cap-drop=ALL).
"""
import os
import sys
try:
os.chroot("/")
# Classic chroot escape: chroot to / then navigate up.
for _ in range(32):
try:
os.chdir("..")
except OSError:
break
os.chroot(".")
try:
with open("etc/shadow", "r") as f:
content = f.read(64)
print(f"NYX_ESCAPE_SUCCESS: escaped chroot, read /etc/shadow: {content[:32]!r}")
sys.exit(0)
except (PermissionError, OSError):
print("PARTIAL: chroot succeeded but could not read /etc/shadow", file=sys.stderr)
sys.exit(1)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,14 @@
"""Escape attempt: open /dev/mem to read physical memory.
Requires CAP_SYS_RAWIO. Expected outcome: PermissionError.
"""
import sys
try:
with open("/dev/mem", "rb") as f:
data = f.read(64)
print(f"NYX_ESCAPE_SUCCESS: read {len(data)} bytes from /dev/mem")
sys.exit(0)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,15 @@
"""Escape attempt: perform a DNS lookup to leak container identity.
Expected outcome: socket.gaierror (--network none, no DNS resolver).
"""
import socket
import sys
try:
# Try to resolve an external hostname; requires network access.
ip = socket.gethostbyname("google.com")
print(f"NYX_ESCAPE_SUCCESS: DNS resolved google.com -> {ip}")
sys.exit(0)
except (socket.gaierror, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,20 @@
"""Escape attempt: establish a TCP connection to a non-allowlisted external host.
Expected outcome: ConnectionRefusedError or OSError (--network none blocks egress).
"""
import socket
import sys
# Cloudflare DNS — well-known, definitely not the OOB oracle.
TARGET = ("1.1.1.1", 443)
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(3)
s.connect(TARGET)
s.close()
print(f"NYX_ESCAPE_SUCCESS: connected to {TARGET}")
sys.exit(0)
except (ConnectionRefusedError, OSError, socket.timeout) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,22 @@
"""Escape attempt: detect host-only env variables leaking into the sandbox.
Docker base images set HOME/PATH/LANG themselves, so those alone are not a
host leak. We probe for variables that only exist on a developer host:
USER/LOGNAME/SHELL (set by login shells) and SSH_*/XDG_* (set by the
developer's session). Exit 0 only when one of those host-only markers is
present in the sandbox env.
"""
import os
import sys
host_only = ["USER", "LOGNAME", "SHELL", "SSH_CONNECTION", "SSH_TTY", "XDG_SESSION_ID"]
leaked = [k for k in host_only if k in os.environ]
if leaked:
print(f"NYX_ESCAPE_SUCCESS: host env vars leaked: {leaked}")
sys.exit(0)
visible = list(os.environ.keys())[:5]
print(f"BLOCKED: host-only env vars absent; visible sample: {visible}",
file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,17 @@
"""Escape attempt: write a file outside the sandboxed workdir.
Expected outcome: PermissionError or write succeeds but test checks host.
Exit 0 only when escape succeeds (file written outside container).
"""
import os
import sys
marker = "/tmp/nyx_escape_file_write_marker"
try:
with open(marker, "w") as f:
f.write("NYX_ESCAPE_SUCCESS")
print("NYX_ESCAPE_SUCCESS: wrote to host /tmp")
sys.exit(0)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,23 @@
"""Escape attempt: fork bomb to exhaust host process table.
Expected outcome: killed by PID limit (pids-limit in docker) or timeout.
"""
import os
import sys
try:
# Attempt a fork bomb; limited by docker --pids-limit or sandbox timeout.
pids = []
for _ in range(512):
pid = os.fork()
if pid == 0:
# Child: loop forever
while True:
pass
pids.append(pid)
# Should not reach here in a properly sandboxed environment.
print("NYX_ESCAPE_SUCCESS: fork bomb created 512 children")
sys.exit(0)
except (BlockingIOError, PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,17 @@
"""Escape attempt: create a hardlink to a sensitive file outside the workdir.
Expected outcome: PermissionError (EPERM cross-device or protected hardlink).
Linux protects hardlinks to files owned by other users since 3.6+.
"""
import os
import sys
try:
os.link("/etc/shadow", "/tmp/shadow_hardlink")
with open("/tmp/shadow_hardlink", "r") as f:
data = f.read(64)
print(f"NYX_ESCAPE_SUCCESS: hardlinked /etc/shadow: {data[:32]!r}")
sys.exit(0)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,39 @@
"""Escape attempt: enumerate host PIDs via /proc to find and signal host processes.
Expected outcome: only container PIDs visible; cannot signal PID 1 outside ns.
"""
import os
import sys
import glob
# Collect all numeric /proc entries (visible PIDs).
pids = []
for entry in glob.glob("/proc/[0-9]*"):
try:
pid = int(os.path.basename(entry))
pids.append(pid)
except ValueError:
pass
# A correctly isolated container sees very few PIDs (its own + init).
# The host typically has hundreds of PIDs. More than 100 visible = host leak.
if len(pids) > 100:
print(f"NYX_ESCAPE_SUCCESS: saw {len(pids)} host PIDs via /proc")
sys.exit(0)
# Try to signal PID 1 (init). In a proper PID namespace, PID 1 is the
# container's init, not systemd/launchd. Sending SIGCONT to it is safe.
try:
os.kill(1, 0) # signal 0 = check if process exists (no actual signal)
# PID 1 exists; check if it looks like a host process.
with open("/proc/1/cmdline", "rb") as f:
cmdline = f.read().replace(b"\x00", b" ").strip()
if b"systemd" in cmdline or b"launchd" in cmdline or b"init" in cmdline:
print(f"NYX_ESCAPE_SUCCESS: host init visible: {cmdline[:64]}")
sys.exit(0)
except (PermissionError, OSError):
pass
print(f"BLOCKED: only {len(pids)} PIDs visible, host PID 1 not accessible",
file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,22 @@
"""Escape attempt: send ICMP packets to a host on the network.
Requires CAP_NET_RAW and network access. Expected outcome: PermissionError
(--cap-drop=ALL) or OSError (--network none).
"""
import socket
import sys
import struct
try:
# ICMP ping requires AF_INET + SOCK_RAW + IPPROTO_ICMP
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
s.settimeout(1)
# ICMP echo request: type=8, code=0, checksum=0, id=0, seq=0
packet = struct.pack("bbHHh", 8, 0, 0, 0, 0)
s.sendto(packet, ("8.8.8.8", 0))
s.close()
print("NYX_ESCAPE_SUCCESS: sent ICMP packet to 8.8.8.8")
sys.exit(0)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,28 @@
"""Escape attempt: create a System V shared memory segment accessible host-wide.
Expected outcome: success creates IPC segment inside the container IPC namespace
(isolated by --ipc private default); OR EPERM if IPC syscalls are blocked.
"""
import sys
try:
import sysv_ipc
key = sysv_ipc.ftok("/tmp", ord('N'))
shm = sysv_ipc.SharedMemory(key, sysv_ipc.IPC_CREAT, size=4096)
shm.write(b"NYX_IPC_ESCAPE_TEST" + b"\x00" * (4096 - 20))
# If we can create IPC, check if it's in an isolated namespace.
# A properly isolated container won't share this with the host.
# We can only verify this from the host side, so just report success.
shm.detach()
shm.remove()
# IPC created successfully but inside the container namespace — not an escape.
print("BLOCKED: IPC shm created but confined to container IPC namespace",
file=sys.stderr)
sys.exit(1)
except ImportError:
# sysv_ipc not available — not an escape.
print("BLOCKED: sysv_ipc module not available", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,32 @@
"""Escape attempt: load a kernel module via finit_module/init_module syscall.
Requires CAP_SYS_MODULE. Expected outcome: EPERM (--cap-drop=ALL + seccomp).
"""
import ctypes
import ctypes.util
import sys
NR_finit_module = 313 # x86_64
try:
libc_name = ctypes.util.find_library("c")
if not libc_name:
raise OSError("libc not found")
libc = ctypes.CDLL(libc_name, use_errno=True)
# Pass fd=-1 to trigger EBADF rather than loading, but the capability
# check happens first on a properly hardened kernel.
ret = libc.syscall(NR_finit_module, -1, b"", 0)
if ret == -1:
errno = ctypes.get_errno()
if errno in (1, 13): # EPERM or EACCES
raise PermissionError(errno, "finit_module blocked")
# EBADF means we got past the capability check — capability not dropped.
if errno == 9:
print("NYX_ESCAPE_SUCCESS: finit_module capability not blocked")
sys.exit(0)
raise OSError(errno, f"finit_module errno={errno}")
print("NYX_ESCAPE_SUCCESS: finit_module succeeded")
sys.exit(0)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,32 @@
"""Escape attempt: use keyctl to read host kernel keyring entries.
Expected outcome: EPERM from seccomp (keyctl is often denied in docker).
"""
import ctypes
import ctypes.util
import sys
NR_keyctl = 250 # x86_64
KEYCTL_SEARCH = 10
try:
libc_name = ctypes.util.find_library("c")
if not libc_name:
raise OSError("libc not found")
libc = ctypes.CDLL(libc_name, use_errno=True)
# KEY_SPEC_USER_KEYRING = -4
ret = libc.syscall(NR_keyctl, KEYCTL_SEARCH, -4, b"user", b"nyx_test_key", 0)
if ret == -1:
errno = ctypes.get_errno()
if errno in (1, 13, 38): # EPERM, EACCES, ENOSYS
raise PermissionError(errno, f"keyctl blocked errno={errno}")
# ENOKEY (126) = not found but syscall allowed — partial escape
if errno == 126:
print("NYX_ESCAPE_SUCCESS: keyctl syscall allowed (key not found but accessible)")
sys.exit(0)
raise OSError(errno, f"keyctl errno={errno}")
print(f"NYX_ESCAPE_SUCCESS: keyctl returned {ret}")
sys.exit(0)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,26 @@
"""Escape attempt: bind-mount a host path into the container.
Requires CAP_SYS_ADMIN. Expected outcome: EPERM (--cap-drop=ALL).
"""
import ctypes
import ctypes.util
import sys
import os
MS_BIND = 4096
try:
libc_name = ctypes.util.find_library("c")
if not libc_name:
raise OSError("libc not found")
libc = ctypes.CDLL(libc_name, use_errno=True)
os.makedirs("/tmp/mnt_target", exist_ok=True)
ret = libc.mount(b"/", b"/tmp/mnt_target", b"none", MS_BIND, 0)
if ret == -1:
errno = ctypes.get_errno()
raise OSError(errno, f"mount failed errno={errno}")
print("NYX_ESCAPE_SUCCESS: mounted host / into container")
sys.exit(0)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,24 @@
"""Escape attempt: unshare PID namespace to hide from host ps.
Requires CAP_SYS_ADMIN. Expected outcome: EPERM.
"""
import ctypes
import ctypes.util
import sys
CLONE_NEWPID = 0x20000000
try:
libc_name = ctypes.util.find_library("c")
if not libc_name:
raise OSError("libc not found")
libc = ctypes.CDLL(libc_name, use_errno=True)
ret = libc.unshare(CLONE_NEWPID)
if ret == -1:
errno = ctypes.get_errno()
raise OSError(errno, f"unshare(CLONE_NEWPID) blocked errno={errno}")
print("NYX_ESCAPE_SUCCESS: created new PID namespace")
sys.exit(0)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,33 @@
"""Escape attempt: use perf_event_open to observe host kernel events.
Requires CAP_PERFMON or CAP_SYS_ADMIN. Expected outcome: EPERM.
"""
import ctypes
import ctypes.util
import sys
NR_perf_event_open = 298 # x86_64
try:
libc_name = ctypes.util.find_library("c")
if not libc_name:
raise OSError("libc not found")
libc = ctypes.CDLL(libc_name, use_errno=True)
# Minimal perf_event_attr: type=PERF_TYPE_SOFTWARE, size=120
buf = (ctypes.c_uint8 * 120)()
ctypes.cast(buf, ctypes.POINTER(ctypes.c_uint32))[0] = 1 # type = PERF_TYPE_SOFTWARE
ctypes.cast(buf, ctypes.POINTER(ctypes.c_uint32))[2] = 120 # size
ret = libc.syscall(NR_perf_event_open, buf, -1, -1, -1, 0)
if ret == -1:
errno = ctypes.get_errno()
if errno in (1, 13): # EPERM / EACCES
raise PermissionError(errno, "perf_event_open blocked")
raise OSError(errno, f"perf_event_open errno={errno}")
libc.close(ret)
print("NYX_ESCAPE_SUCCESS: perf_event_open succeeded")
sys.exit(0)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,25 @@
"""Escape attempt: read /proc/kallsyms to obtain kernel symbol addresses for KASLR bypass.
Expected outcome: empty file or EPERM (unprivileged access returns zeros since Linux 4.15).
"""
import sys
try:
with open("/proc/kallsyms", "r") as f:
first_line = f.readline().strip()
if not first_line:
print("BLOCKED: /proc/kallsyms is empty (expected)", file=sys.stderr)
sys.exit(1)
parts = first_line.split()
if parts and parts[0] != "0000000000000000":
print(f"NYX_ESCAPE_SUCCESS: real kernel addresses visible: {parts[0]}")
sys.exit(0)
print("BLOCKED: /proc/kallsyms shows zeroed addresses (kptr_restrict active)",
file=sys.stderr)
sys.exit(1)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,16 @@
"""Escape attempt: write to /proc/self/mem to overwrite own memory.
Expected outcome: PermissionError or EPERM from seccomp/kernel.
"""
import os
import sys
try:
with open("/proc/self/mem", "wb") as f:
f.seek(0)
f.write(b"\x00" * 4096)
print("NYX_ESCAPE_SUCCESS: wrote to /proc/self/mem")
sys.exit(0)
except (PermissionError, OSError, IOError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,28 @@
"""Escape attempt: read host filesystem via /proc/1/root.
In a properly isolated container, /proc/1/root resolves to the container's
own rootfs same (st_dev, st_ino) as `/`. A breakout would show a
different rootfs (host's), detectable via differing device/inode.
The earlier check `os.path.isdir("/proc/1/root/proc/1/root")` was unsound:
/proc/1/root is a magic symlink to the container's `/`, so the path
re-resolves to a real directory in any container yielding a false
NYX_ESCAPE_SUCCESS even when no escape occurred.
"""
import os
import sys
try:
proc_root = os.stat("/proc/1/root")
own_root = os.stat("/")
if (proc_root.st_dev, proc_root.st_ino) != (own_root.st_dev, own_root.st_ino):
print(f"NYX_ESCAPE_SUCCESS: /proc/1/root exposes a different rootfs "
f"(dev={proc_root.st_dev} ino={proc_root.st_ino} "
f"vs dev={own_root.st_dev} ino={own_root.st_ino})")
sys.exit(0)
print("BLOCKED: /proc/1/root is container's own rootfs (dev/ino match)",
file=sys.stderr)
sys.exit(1)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,14 @@
"""Escape attempt: trigger kernel SysRq action via /proc/sysrq-trigger.
Expected outcome: PermissionError (read-only rootfs or missing CAP_SYS_ADMIN).
"""
import sys
try:
with open("/proc/sysrq-trigger", "w") as f:
f.write("h") # 'h' = print help (harmless but requires access)
print("NYX_ESCAPE_SUCCESS: wrote to /proc/sysrq-trigger")
sys.exit(0)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,24 @@
"""Escape attempt: ptrace PID 1 (init/systemd on host).
Requires CAP_SYS_PTRACE. Expected outcome: EPERM (--cap-drop=ALL + seccomp).
"""
import ctypes
import ctypes.util
import sys
PTRACE_ATTACH = 16
try:
libc_name = ctypes.util.find_library("c")
if not libc_name:
raise OSError("libc not found")
libc = ctypes.CDLL(libc_name, use_errno=True)
ret = libc.ptrace(PTRACE_ATTACH, 1, 0, 0)
if ret == -1:
errno = ctypes.get_errno()
raise OSError(errno, f"ptrace EPERM errno={errno}")
print("NYX_ESCAPE_SUCCESS: ptrace attached to pid 1")
sys.exit(0)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,15 @@
"""Escape attempt: create a raw (AF_PACKET) socket.
Requires CAP_NET_RAW. Expected outcome: PermissionError (--cap-drop=ALL).
"""
import socket
import sys
try:
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
s.close()
print("NYX_ESCAPE_SUCCESS: created raw socket")
sys.exit(0)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,31 @@
"""Escape attempt: regain root after dropping privileges.
Docker's python:3-slim base runs as uid 0, so `setuid(0)` from root is a
no-op that path cannot distinguish a real escape. Instead: drop to
`nobody` (uid 65534), then attempt to regain uid 0. With --cap-drop=ALL
(no CAP_SETUID) plus no-new-privileges, regaining uid 0 must fail.
If we already start as non-root, skip the drop and go straight to the
escalation attempt.
"""
import os
import sys
if os.getuid() == 0:
try:
os.setuid(65534) # nobody
except OSError as e:
print(f"INDETERMINATE: cannot drop privileges to nobody: {e}",
file=sys.stderr)
sys.exit(1)
try:
os.setuid(0)
if os.getuid() == 0:
print("NYX_ESCAPE_SUCCESS: regained uid 0 after dropping privileges")
sys.exit(0)
print(f"PARTIAL: setuid(0) succeeded but uid={os.getuid()}", file=sys.stderr)
sys.exit(1)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,20 @@
"""Escape attempt: create a symlink from workdir into a host path and follow it.
Expected outcome: the symlink is constrained to the container; following it
reaches container filesystem, not host. RO workdir mount blocks the write.
"""
import os
import sys
try:
# Workdir is mounted RO in docker; creating a symlink there will fail.
link_path = "/workdir/escape_link"
os.symlink("/etc/shadow", link_path)
# Try to read through the symlink.
with open(link_path, "r") as f:
data = f.read(64)
print(f"NYX_ESCAPE_SUCCESS: read via symlink: {data[:32]!r}")
sys.exit(0)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

View file

@ -0,0 +1,32 @@
"""Escape attempt: exhaust host disk/tmpfs by writing large files.
Expected outcome: ENOSPC or killed by cgroup memory limit.
"""
import sys
import os
CHUNK = 1024 * 1024 # 1 MiB
MAX_WRITES = 8192 # 8 GiB total — well above any reasonable tmpfs limit
try:
written = 0
files = []
for i in range(MAX_WRITES):
path = f"/tmp/nyx_overflow_{i}"
files.append(path)
with open(path, "wb") as f:
f.write(b"\x00" * CHUNK)
written += CHUNK
print(f"NYX_ESCAPE_SUCCESS: wrote {written // (1024*1024)} MiB to /tmp")
sys.exit(0)
except (OSError, IOError) as e:
# ENOSPC expected — disk/tmpfs is full, no host impact.
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)
finally:
# Cleanup: don't leave huge files even on partial success.
for p in files:
try:
os.unlink(p)
except OSError:
pass

View file

@ -0,0 +1,30 @@
"""Escape attempt: create a new user namespace to gain capabilities.
Expected outcome: EPERM (Docker no-new-privileges blocks unshare).
"""
import ctypes
import ctypes.util
import sys
CLONE_NEWUSER = 0x10000000
NR_unshare = 272 # x86_64
try:
libc_name = ctypes.util.find_library("c")
if not libc_name:
raise OSError("libc not found")
libc = ctypes.CDLL(libc_name, use_errno=True)
ret = libc.unshare(CLONE_NEWUSER)
if ret == -1:
errno = ctypes.get_errno()
raise OSError(errno, f"unshare(CLONE_NEWUSER) blocked errno={errno}")
# If we get here, we're in a new user namespace — check if we're root.
uid = libc.getuid()
if uid == 0:
print("NYX_ESCAPE_SUCCESS: gained root in new user namespace")
sys.exit(0)
print(f"PARTIAL: unshare succeeded but uid={uid}", file=sys.stderr)
sys.exit(1)
except (PermissionError, OSError) as e:
print(f"BLOCKED: {e}", file=sys.stderr)
sys.exit(1)

274
tests/dynamic_parity.rs Normal file
View file

@ -0,0 +1,274 @@
//! Python verdict-parity test (§8.3).
//!
//! Verifies that the M2 Python fixture set produces identical verdicts when
//! run through `SandboxBackend::Docker` versus `SandboxBackend::Process`.
//!
//! Identical means: same `VerifyStatus` AND same `InconclusiveReason` /
//! `UnsupportedReason` (the `reason` strings match for `Inconclusive` /
//! `Unsupported`). The exact payload that triggered `Confirmed` may differ
//! if Docker isolation changes observable output, but the status must agree.
//!
//! Tests skip when docker is absent (`docker info` fails). CI gate: the
//! `linux-with-docker` matrix row is authoritative for this suite.
//!
//! Run with: `cargo nextest run --features dynamic --test dynamic_parity`
#[cfg(feature = "dynamic")]
mod parity_tests {
use nyx_scanner::commands::scan::Diag;
use nyx_scanner::dynamic::verify::{verify_finding, VerifyOptions};
use nyx_scanner::dynamic::sandbox::{SandboxBackend, SandboxOptions};
use nyx_scanner::evidence::{Confidence, Evidence, FlowStep, FlowStepKind, VerifyStatus};
use nyx_scanner::labels::Cap;
use nyx_scanner::patterns::{FindingCategory, Severity};
use std::time::Duration;
fn docker_available() -> bool {
std::process::Command::new("docker")
.arg("info")
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false)
}
fn source_step(file: &str, function: &str) -> FlowStep {
FlowStep {
step: 1,
kind: FlowStepKind::Source,
file: file.into(),
line: 1,
col: 0,
snippet: None,
variable: Some("x".into()),
callee: None,
function: Some(function.into()),
is_cross_file: false,
}
}
fn sink_step(file: &str, line: u32) -> FlowStep {
FlowStep {
step: 2,
kind: FlowStepKind::Sink,
file: file.into(),
line,
col: 0,
snippet: None,
variable: None,
callee: None,
function: None,
is_cross_file: false,
}
}
fn python_diag(fixture_path: &str, function: &str, sink_line: u32, cap: Cap) -> Diag {
Diag {
path: fixture_path.into(),
line: sink_line as usize,
col: 0,
severity: Severity::High,
id: "taint-unsanitised-flow".into(),
category: FindingCategory::Security,
path_validated: false,
guard_kind: None,
message: None,
labels: vec![],
confidence: Some(Confidence::High),
evidence: Some(Evidence {
flow_steps: vec![
source_step(fixture_path, function),
sink_step(fixture_path, sink_line),
],
sink_caps: cap.bits(),
..Default::default()
}),
rank_score: None,
rank_reason: None,
suppressed: false,
suppression: None,
rollup: None,
finding_id: String::new(),
alternative_finding_ids: vec![],
stable_hash: 0,
}
}
fn process_opts() -> VerifyOptions {
VerifyOptions {
sandbox: SandboxOptions {
backend: SandboxBackend::Process,
timeout: Duration::from_secs(10),
..SandboxOptions::default()
},
project_root: None,
}
}
fn docker_opts() -> VerifyOptions {
VerifyOptions {
sandbox: SandboxOptions {
backend: SandboxBackend::Docker,
timeout: Duration::from_secs(30),
..SandboxOptions::default()
},
project_root: None,
}
}
/// Assert two verdicts agree on status (and on reason for non-Confirmed).
fn assert_parity(fixture: &str, process_result: &nyx_scanner::evidence::VerifyResult,
docker_result: &nyx_scanner::evidence::VerifyResult) {
// If docker backend is unavailable, docker result will be Unsupported.
// That's acceptable — we can't compare when docker is missing.
if docker_result.status == VerifyStatus::Unsupported {
if let Some(ref r) = docker_result.reason {
if format!("{r:?}").contains("BackendUnavailable") {
return; // Docker absent — skip comparison.
}
}
}
assert_eq!(
process_result.status, docker_result.status,
"fixture {fixture}: status mismatch: process={:?} docker={:?}\n\
process detail: {:?}\ndocker detail: {:?}",
process_result.status, docker_result.status,
process_result.detail, docker_result.detail,
);
// For non-Confirmed statuses, the reason must also match.
if process_result.status != VerifyStatus::Confirmed {
assert_eq!(
process_result.reason, docker_result.reason,
"fixture {fixture}: reason mismatch: process={:?} docker={:?}",
process_result.reason, docker_result.reason,
);
}
}
// ── M2 Python fixture parity tests ────────────────────────────────────────
/// Helper: run a fixture through both backends and assert parity.
fn parity_check(fixture: &str, function: &str, sink_line: u32, cap: Cap) {
if !docker_available() { return; }
let diag = python_diag(fixture, function, sink_line, cap);
let process_result = verify_finding(&diag, &process_opts());
let docker_result = verify_finding(&diag, &docker_opts());
assert_parity(fixture, &process_result, &docker_result);
}
#[test]
fn parity_sqli_positive() {
parity_check(
"tests/dynamic_fixtures/python/sqli_positive.py",
"login",
7,
Cap::SQL_QUERY,
);
}
#[test]
fn parity_sqli_negative() {
parity_check(
"tests/dynamic_fixtures/python/sqli_negative.py",
"safe_login",
8,
Cap::SQL_QUERY,
);
}
#[test]
fn parity_cmdi_positive() {
parity_check(
"tests/dynamic_fixtures/python/cmdi_positive.py",
"run_command",
5,
Cap::CODE_EXEC,
);
}
#[test]
fn parity_cmdi_negative() {
parity_check(
"tests/dynamic_fixtures/python/cmdi_negative.py",
"safe_command",
6,
Cap::CODE_EXEC,
);
}
#[test]
fn parity_fileio_positive() {
parity_check(
"tests/dynamic_fixtures/python/fileio_positive.py",
"read_file",
5,
Cap::FILE_IO,
);
}
#[test]
fn parity_fileio_negative() {
parity_check(
"tests/dynamic_fixtures/python/fileio_negative.py",
"safe_read_file",
6,
Cap::FILE_IO,
);
}
#[test]
fn parity_xss_positive() {
parity_check(
"tests/dynamic_fixtures/python/xss_positive.py",
"render_page",
5,
Cap::HTML_ESCAPE,
);
}
#[test]
fn parity_xss_negative() {
parity_check(
"tests/dynamic_fixtures/python/xss_negative.py",
"safe_render",
6,
Cap::HTML_ESCAPE,
);
}
#[test]
fn parity_ssrf_positive() {
parity_check(
"tests/dynamic_fixtures/python/ssrf_positive.py",
"fetch_url",
5,
Cap::SSRF,
);
}
/// Cross-backend status must agree for Unsupported fixtures (no corpus).
#[test]
fn parity_sqli_unsupported() {
parity_check(
"tests/dynamic_fixtures/python/sqli_unsupported.py",
"unsupported_fn",
5,
Cap::SQL_QUERY,
);
}
/// Rust finding (lang unsupported) must return same status on both backends.
#[test]
fn parity_rust_lang_unsupported() {
if !docker_available() { return; }
let diag = python_diag("src/handler.rs", "handle_request", 10, Cap::SQL_QUERY);
let process_result = verify_finding(&diag, &process_opts());
let docker_result = verify_finding(&diag, &docker_opts());
assert_parity("src/handler.rs (rust)", &process_result, &docker_result);
}
}

View file

@ -0,0 +1,244 @@
//! Sandbox-escape regression suite (§8.3, §15.2 accept-gate row).
//!
//! Each test attempts a known sandbox escape technique via the docker backend.
//! The assertion is: harness exit ≠ 0 OR `SandboxOutcome.timed_out` OR
//! `Err(SandboxError::BackendUnavailable)`. No test may produce
//! `NYX_ESCAPE_SUCCESS` in stdout/stderr; the host marker file must remain
//! absent.
//!
//! Tests skip automatically when docker is not available on the host
//! (`docker info` fails). The `linux-with-docker` CI matrix row is the
//! authoritative gate; `linux-without` and `macos` rows skip this suite.
//!
//! Run with: `cargo nextest run --features dynamic --test dynamic_sandbox_escape`
#[cfg(feature = "dynamic")]
mod escape_tests {
use nyx_scanner::dynamic::harness::BuiltHarness;
use nyx_scanner::dynamic::sandbox::{self, SandboxBackend, SandboxError, SandboxOptions};
use std::fs;
use std::path::{Path, PathBuf};
use std::time::Duration;
// ── Helpers ───────────────────────────────────────────────────────────────
fn docker_available() -> bool {
std::process::Command::new("docker")
.arg("info")
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false)
}
/// Copy a fixture script to a fresh temp workdir and return a BuiltHarness.
fn harness_for_fixture(fixture_name: &str) -> (tempfile::TempDir, BuiltHarness) {
let tmpdir = tempfile::TempDir::new().expect("temp dir");
let script_src = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("tests/dynamic_fixtures/escape")
.join(fixture_name);
let script_dst = tmpdir.path().join(fixture_name);
fs::copy(&script_src, &script_dst).expect("copy fixture");
let harness = BuiltHarness {
workdir: tmpdir.path().to_path_buf(),
command: vec!["python3".into(), fixture_name.into()],
env: vec![],
source: String::new(),
entry_source: String::new(),
};
(tmpdir, harness)
}
fn escape_opts() -> SandboxOptions {
SandboxOptions {
timeout: Duration::from_secs(10),
memory_mib: 256,
backend: SandboxBackend::Docker,
env_passthrough: vec![],
output_limit: 65536,
}
}
/// Minimal no-op payload (escape scripts ignore NYX_PAYLOAD).
fn noop_payload() -> nyx_scanner::dynamic::corpus::Payload {
nyx_scanner::dynamic::corpus::Payload {
bytes: b"",
label: "escape-noop",
oracle: nyx_scanner::dynamic::corpus::Oracle::ExitStatus(1),
is_benign: true,
}
}
/// Assert that an escape fixture did not escape.
///
/// Accepts three outcomes as "blocked":
/// 1. `Err(SandboxError::BackendUnavailable)` — docker absent, no escape possible.
/// 2. `Ok(outcome)` where `exit_code != Some(0)` — script hit an error (blocked).
/// 3. `Ok(outcome)` where `timed_out` — script ran into sandbox timeout.
///
/// Rejects: `Ok(outcome)` with `exit_code == Some(0)` and stdout/stderr
/// containing `NYX_ESCAPE_SUCCESS`.
fn assert_no_escape(
result: Result<sandbox::SandboxOutcome, SandboxError>,
fixture: &str,
marker_path: Option<&PathBuf>,
) {
match result {
Err(SandboxError::BackendUnavailable(_)) => {
// Docker not available — escape impossible. Pass.
return;
}
Err(_) => {
// Other errors (spawn failure, I/O) — process didn't complete
// successfully, so no escape occurred.
return;
}
Ok(ref outcome) => {
// Check for escape sentinel in output.
let stdout = std::str::from_utf8(&outcome.stdout).unwrap_or("");
let stderr = std::str::from_utf8(&outcome.stderr).unwrap_or("");
assert!(
!stdout.contains("NYX_ESCAPE_SUCCESS") && !stderr.contains("NYX_ESCAPE_SUCCESS"),
"fixture {fixture}: escape succeeded!\nstdout: {stdout}\nstderr: {stderr}"
);
// exit_code == Some(0) AND no sentinel = highly suspicious but not
// an escape. Accept; the fixture may have exited 0 on a partial failure.
// The sentinel check above is the authoritative guard.
let _ = outcome;
}
}
// Check host marker file (for file_write_outside_workdir).
if let Some(marker) = marker_path {
assert!(
!marker.exists(),
"fixture {fixture}: host marker file appeared at {marker:?} — escape succeeded!"
);
}
}
// ── Individual escape tests ───────────────────────────────────────────────
// All tests follow the same pattern: skip if docker absent, run fixture,
// assert no escape.
macro_rules! escape_test {
($name:ident, $fixture:literal) => {
#[test]
fn $name() {
if !docker_available() { return; }
let (_tmpdir, harness) = harness_for_fixture($fixture);
let result = sandbox::run(&harness, &noop_payload(), &escape_opts());
assert_no_escape(result, $fixture, None);
}
};
($name:ident, $fixture:literal, marker = $marker:expr) => {
#[test]
fn $name() {
if !docker_available() { return; }
let marker: PathBuf = PathBuf::from($marker);
// Remove stale marker before test.
let _ = fs::remove_file(&marker);
let (_tmpdir, harness) = harness_for_fixture($fixture);
let result = sandbox::run(&harness, &noop_payload(), &escape_opts());
assert_no_escape(result, $fixture, Some(&marker));
// Cleanup any partial writes.
let _ = fs::remove_file(&marker);
}
};
}
escape_test!(
escape_file_write_outside_workdir,
"file_write_outside_workdir.py",
marker = "/tmp/nyx_escape_file_write_marker"
);
escape_test!(escape_fork_bomb, "fork_bomb.py");
escape_test!(escape_raw_socket, "raw_socket.py");
escape_test!(escape_proc_mem_write, "proc_mem_write.py");
escape_test!(escape_ptrace_attach, "ptrace_attach.py");
escape_test!(escape_proc_root_breakout, "proc_root_breakout.py");
escape_test!(escape_mount_ns_abuse, "mount_ns_abuse.py");
escape_test!(escape_kernel_module_load, "kernel_module_load.py");
escape_test!(escape_perf_event_open, "perf_event_open.py");
escape_test!(escape_userns_breakout, "userns_breakout.py");
escape_test!(escape_tmpfs_overflow, "tmpfs_overflow.py");
escape_test!(escape_proc_sysrq, "proc_sysrq.py");
escape_test!(escape_device_file_access, "device_file_access.py");
escape_test!(escape_symlink_escape, "symlink_escape.py");
escape_test!(escape_hardlink_escape, "hardlink_escape.py");
escape_test!(escape_env_injection, "env_injection.py");
escape_test!(escape_dns_leak, "dns_leak.py");
escape_test!(escape_egress_non_allowlisted, "egress_non_allowlisted.py");
escape_test!(escape_keyctl_abuse, "keyctl_abuse.py");
escape_test!(escape_setuid_abuse, "setuid_abuse.py");
escape_test!(escape_namespace_escape, "namespace_escape.py");
escape_test!(escape_cgroup_escape, "cgroup_escape.py");
escape_test!(escape_host_pid_visibility, "host_pid_visibility.py");
escape_test!(escape_icmp_flood, "icmp_flood.py");
escape_test!(escape_proc_kallsyms, "proc_kallsyms.py");
escape_test!(escape_chroot_escape, "chroot_escape.py");
escape_test!(escape_ipc_shm, "ipc_shm_escape.py");
// ── Docker exec reuse test ────────────────────────────────────────────────
/// Verify that the second payload for the same spec_hash reuses the running
/// container via `docker exec` rather than starting a new `docker run`.
///
/// Method: run two payloads for the same harness workdir and check that
/// the container registry holds one entry (started once, reused once).
#[test]
fn docker_exec_reuse_for_same_workdir() {
if !docker_available() { return; }
let (_tmpdir, harness) = harness_for_fixture("dns_leak.py");
let opts = escape_opts();
// First run — starts a new container.
let r1 = sandbox::run(&harness, &noop_payload(), &opts);
// Second run — should exec into the running container.
let r2 = sandbox::run(&harness, &noop_payload(), &opts);
// Both should succeed (blocked, not escaped — dns_leak exits 1).
// The important thing is neither panics or returns an unexpected error.
match r1 {
Err(SandboxError::BackendUnavailable(_)) => return,
_ => {}
}
match r2 {
Err(SandboxError::BackendUnavailable(_)) => return,
_ => {}
}
// Verify the container is still running (not torn down between calls).
// Container name is derived from the workdir path.
let spec_hash = _tmpdir.path().file_name()
.and_then(|n| n.to_str())
.unwrap_or("");
let container_name = format!("nyx-{spec_hash}");
let out = std::process::Command::new("docker")
.args(["inspect", "--format={{.State.Running}}", &container_name])
.output();
match out {
Ok(o) if o.status.success() => {
let running = std::str::from_utf8(&o.stdout)
.unwrap_or("")
.trim()
== "true";
// Container should still be running (exec reuse kept it alive).
assert!(
running,
"container {container_name} not running after second exec — exec reuse failed"
);
}
_ => {
// Container already cleaned up or inspect failed; this is
// acceptable when Docker does its own cleanup.
}
}
}
}