[pitboss/grind] deferred session-0021 (20260522T043516Z-29b8)

This commit is contained in:
pitboss 2026-05-22 06:48:32 -05:00
parent f265140935
commit cc183a8186
6 changed files with 603 additions and 7 deletions

View file

@ -59,4 +59,62 @@ pub const PAYLOADS: &[CuratedPayload] = &[
benign_control: None,
no_benign_control_rationale: None,
},
// Phase 08 tier-(b): raw-socket wire-frame smuggling payload.
// Same CRLF-bearing bytes as the Flask payload above, but pinned
// to the `python_raw` fixture (a `BaseHTTPRequestHandler` writing
// raw bytes via `self.wfile.write`). The wire frame captured off
// the response socket carries two distinct `Set-Cookie:` lines, so
// `HeaderSmuggledInWire { primary: "Set-Cookie", smuggled:
// "Set-Cookie" }` fires — proving the smuggled header survived to
// the actual wire instead of being CRLF-stripped en route.
//
// Distinct payload (not just an extra predicate on the Flask row)
// because Flask's werkzeug response serializer strips CRLF at the
// wire-write boundary, so the wire-frame predicate would never
// fire against the canonical Flask fixture. See
// `.pitboss/play/deferred.md` (Phase 08 wire-frame option A) for
// the framework-level CRLF-strip empirical from session-0018.
CuratedPayload {
bytes: b"nyx-session\r\nSet-Cookie: nyx-injected=pwn",
label: "header-injection-python-raw-wire-smuggle",
oracle: Oracle::SinkProbe {
predicates: &[ProbePredicate::HeaderSmuggledInWire {
primary: "Set-Cookie",
smuggled: "Set-Cookie",
}],
},
is_benign: false,
provenance: PayloadProvenance::Curated,
since_corpus_version: 12,
deprecated_at_corpus_version: None,
fixture_paths: &["tests/dynamic_fixtures/header_injection/python_raw/vuln.py"],
oob_nonce_slot: false,
probe_predicates: &[ProbePredicate::HeaderSmuggledInWire {
primary: "Set-Cookie",
smuggled: "Set-Cookie",
}],
benign_control: Some(PayloadRef {
label: "header-injection-python-raw-benign",
}),
no_benign_control_rationale: None,
},
CuratedPayload {
bytes: b"nyx-session%0D%0ASet-Cookie%3A%20nyx-injected%3Dpwn",
label: "header-injection-python-raw-benign",
oracle: Oracle::SinkProbe {
predicates: &[ProbePredicate::HeaderSmuggledInWire {
primary: "Set-Cookie",
smuggled: "Set-Cookie",
}],
},
is_benign: true,
provenance: PayloadProvenance::Curated,
since_corpus_version: 12,
deprecated_at_corpus_version: None,
fixture_paths: &["tests/dynamic_fixtures/header_injection/python_raw/vuln.py"],
oob_nonce_slot: false,
probe_predicates: &[],
benign_control: None,
no_benign_control_rationale: None,
},
];

View file

@ -2166,6 +2166,130 @@ pub fn emit_header_injection_harness(spec: &HarnessSpec) -> HarnessSource {
let uses_flask = entry_source.contains("from flask")
|| entry_source.contains("import flask")
|| entry_source.contains("werkzeug.wrappers");
// Phase 08 tier-(b): a fixture that subclasses
// `BaseHTTPRequestHandler` writes bytes straight to the response
// socket via `self.wfile.write`, bypassing every framework-level
// CRLF validator (werkzeug / Flask / axum / Tomcat all strip CRLF
// before write). The harness boots the handler on a loopback
// port and captures the raw response-header block as a
// `ProbeKind::HeaderWireFrame` probe.
let uses_raw_socket = entry_source.contains("BaseHTTPRequestHandler");
let wire_frame_via_fixture = if uses_raw_socket {
format!(
r#"def _nyx_wire_frame_via_fixture(payload):
# Phase 08 tier-(b): boot the fixture's BaseHTTPRequestHandler on
# 127.0.0.1:0, issue one raw-socket GET, read the bytes the handler
# wrote to the response socket up to the CRLF-CRLF boundary.
# Returns the captured header-block bytes on success, or None on
# import / boot failure so the caller can fall back to the inline
# synthetic probe.
import http.server
import socket
import threading
sys.path.insert(0, ".")
try:
mod = importlib.import_module("{module_name}")
except Exception:
return None
Handler = getattr(mod, "VulnHandler", None)
if Handler is None:
return None
try:
if isinstance(payload, str):
Handler.cookie_value = payload.encode("utf-8")
else:
Handler.cookie_value = bytes(payload)
except Exception:
return None
try:
server = http.server.HTTPServer(("127.0.0.1", 0), Handler)
except Exception:
return None
port = server.server_address[1]
t = threading.Thread(target=server.serve_forever, daemon=True)
t.start()
raw = b""
try:
try:
sock = socket.create_connection(("127.0.0.1", port), timeout=5)
except Exception:
return None
try:
sock.settimeout(2.0)
sock.sendall(b"GET / HTTP/1.0\r\nHost: 127.0.0.1\r\n\r\n")
while len(raw) < 65536:
try:
chunk = sock.recv(4096)
except socket.timeout:
break
if not chunk:
break
raw += chunk
if b"\r\n\r\n" in raw:
break
finally:
try:
sock.close()
except Exception:
pass
finally:
try:
server.shutdown()
except Exception:
pass
try:
server.server_close()
except Exception:
pass
sep = raw.find(b"\r\n\r\n")
if sep == -1:
return raw
return raw[:sep]
def _nyx_wire_frame_probe(raw_bytes):
rec = {{
"sink_callee": "http.server.wfile.write",
"args": [],
"captured_at_ns": time.time_ns(),
"payload_id": os.environ.get("NYX_PAYLOAD_ID", ""),
"kind": {{"kind": "HeaderWireFrame", "raw_bytes": list(raw_bytes)}},
"witness": __nyx_witness("http.server.wfile.write", []),
}}
__nyx_emit(rec)
"#
)
} else {
String::new()
};
let invoke_via_wire_frame = if uses_raw_socket {
r#" raw_bytes = _nyx_wire_frame_via_fixture(payload)
if raw_bytes is not None:
_nyx_wire_frame_probe(raw_bytes)
# Also emit a HeaderEmit record per Set-Cookie line so the
# tier-(a) HeaderInjected predicate fires on the same payload
# that trips HeaderSmuggledInWire. The wire-frame branch is
# the source of truth; the HeaderEmit records are derived from
# the same captured bytes.
for line in raw_bytes.split(b"\r\n"):
sep = line.find(b": ")
if sep < 0:
continue
name = line[:sep].decode("ascii", "replace")
if name.lower() != "set-cookie":
continue
value = line[sep + 2:].decode("utf-8", "replace")
_nyx_header_probe(name, value)
print("__NYX_SINK_HIT__", flush=True)
sys.stdout.write(json.dumps({"wire_frame_len": len(raw_bytes)}) + "\n")
sys.stdout.flush()
return
"#
} else {
""
};
let via_fixture = if uses_flask {
format!(
r#"def _nyx_header_via_fixture(payload):
@ -2236,10 +2360,14 @@ pub fn emit_header_injection_harness(spec: &HarnessSpec) -> HarnessSource {
} else {
""
};
let importlib_import = if uses_flask { "import importlib\n" } else { "" };
let importlib_import = if uses_flask || uses_raw_socket {
"import importlib\n"
} else {
""
};
let body = format!(
r#"#!/usr/bin/env python3
"""Nyx dynamic harness — HEADER_INJECTION flask.Response.headers.__setitem__ (Phase 08 / Track J.6)."""
"""Nyx dynamic harness — HEADER_INJECTION flask.Response.headers.__setitem__ + raw-socket wire-frame (Phase 08 / Track J.6)."""
{importlib_import}import json
import os
import sys
@ -2263,9 +2391,9 @@ def _nyx_header_probe(name, value):
__nyx_emit(rec)
{via_fixture}def _nyx_run():
{wire_frame_via_fixture}{via_fixture}def _nyx_run():
payload = os.environ.get("NYX_PAYLOAD", "")
{invoke_via_fixture} # Synthetic fallback mirrors
{invoke_via_wire_frame}{invoke_via_fixture} # Synthetic fallback mirrors
# `werkzeug.datastructures.Headers.__setitem__` semantics: the
# value bytes flow through unmodified, so a tainted payload that
# carries raw `\r\n` lands on the wire as a header split.
@ -3754,6 +3882,89 @@ mod tests {
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn emit_header_injection_harness_routes_through_wire_frame_when_base_http_request_handler_imported()
{
let dir = std::env::temp_dir().join("nyx_phase08_py_test_wire_frame");
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
let entry = dir.join("vuln.py");
std::fs::write(
&entry,
"from http.server import BaseHTTPRequestHandler\n\
class VulnHandler(BaseHTTPRequestHandler):\n cookie_value = b''\n def do_GET(self):\n self.wfile.write(b'HTTP/1.0 200 OK\\r\\nSet-Cookie: ' + self.__class__.cookie_value + b'\\r\\n\\r\\nok')\n",
)
.unwrap();
let h = emit_header_injection_harness(&make_header_spec(
entry.to_str().unwrap(),
"run",
));
assert!(
h.source.contains("def _nyx_wire_frame_via_fixture(payload):"),
"tier-(b) harness must define the wire-frame helper: {}",
h.source
);
assert!(
h.source.contains("http.server.HTTPServer((\"127.0.0.1\", 0)"),
"tier-(b) harness must boot HTTPServer on loopback ephemeral port: {}",
h.source
);
assert!(
h.source.contains("getattr(mod, \"VulnHandler\", None)"),
"tier-(b) harness must look up the VulnHandler class: {}",
h.source
);
assert!(
h.source.contains("raw_bytes = _nyx_wire_frame_via_fixture(payload)"),
"harness main must call the wire-frame helper first when raw-socket fixture detected: {}",
h.source
);
assert!(
h.source
.contains(r#""kind": {"kind": "HeaderWireFrame", "raw_bytes": list(raw_bytes)}"#),
"tier-(b) harness must emit a HeaderWireFrame probe carrying the raw header-block bytes: {}",
h.source
);
// Wire-frame branch also derives HeaderEmit records from the
// captured Set-Cookie lines so the tier-(a) HeaderInjected
// predicate fires on the same payload.
assert!(
h.source.contains("_nyx_header_probe(name, value)"),
"wire-frame branch must also emit derived HeaderEmit probes: {}",
h.source
);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn emit_header_injection_harness_drops_wire_frame_branch_when_only_flask_imported() {
let dir = std::env::temp_dir().join("nyx_phase08_py_test_no_wire_frame");
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
let entry = dir.join("vuln.py");
std::fs::write(
&entry,
"from flask import Response\n\
def run(value):\n response = Response('ok')\n response.headers['Set-Cookie'] = value\n return response\n",
)
.unwrap();
let h = emit_header_injection_harness(&make_header_spec(
entry.to_str().unwrap(),
"run",
));
assert!(
!h.source.contains("def _nyx_wire_frame_via_fixture"),
"flask-only fixture must not pull in the wire-frame helper: {}",
h.source
);
assert!(
!h.source.contains("HeaderWireFrame"),
"flask-only harness must not emit the HeaderWireFrame probe shape: {}",
h.source
);
let _ = std::fs::remove_dir_all(&dir);
}
fn make_redirect_spec(entry_file: &str, entry_name: &str) -> HarnessSpec {
let mut spec = make_spec(PayloadSlot::Param(0));
spec.expected_cap = Cap::OPEN_REDIRECT;

View file

@ -420,6 +420,27 @@ pub enum ProbePredicate {
/// expression expanded into an over-broad selector.
n: u32,
},
/// Phase 11 (Track J.9): JSON_PARSE depth-bomb predicate.
///
/// Fires when at least one drained probe carries
/// [`ProbeKind::JsonParse`] whose `depth > max_depth` OR whose
/// `excessive_depth` flag is set. The canonical attacker payload
/// is a deeply-nested JSON document (`[[[[[...]]]]]`) that drives
/// the host's parser to a recursion limit or stack-exhaustion
/// shape; the benign control is a flat or shallowly-nested
/// document that leaves the predicate clear.
///
/// Cross-cutting in the same sense as
/// [`Self::DeserializeGadgetInvoked`] /
/// [`Self::XxeEntityExpanded`] — evaluated across every drained
/// probe rather than against a single record.
JsonParseExcessiveDepth {
/// Maximum legal nesting depth. A captured probe with
/// `depth > max_depth` (or `excessive_depth = true`) fires the
/// predicate. Typical benign depths are under 8; depth-bomb
/// payloads ship 256+ nested arrays.
max_depth: u32,
},
}
/// How we decide a sandbox run confirmed the sink fired.
@ -649,6 +670,20 @@ pub fn oracle_fired_with_stubs(
if !outbound_ok {
return false;
}
// Phase 11 (Track J.9): JSON_PARSE depth-bomb cross-cutting
// predicates. Each `JsonParseExcessiveDepth { max_depth }`
// consults the captured probe channel for a
// [`ProbeKind::JsonParse`] record whose `depth > max_depth`
// OR whose `excessive_depth` flag is set.
let json_parse_ok = cross.iter().all(|p| match p {
ProbePredicate::JsonParseExcessiveDepth { max_depth } => {
probes_satisfy_json_parse_excessive(probes, *max_depth)
}
_ => true,
});
if !json_parse_ok {
return false;
}
// Phase 04 (Track J.2): SSTI render-equality cross-cutting
// predicates. Each `TemplateEvalEqual { expected }` consults
// the captured stdout body — see [`stdout_template_equals`].
@ -687,7 +722,8 @@ pub fn oracle_fired_with_stubs(
| ProbeKind::PrototypePollution { .. }
| ProbeKind::WeakKey { .. }
| ProbeKind::IdorAccess { .. }
| ProbeKind::OutboundNetwork { .. } => false,
| ProbeKind::OutboundNetwork { .. }
| ProbeKind::JsonParse { .. } => false,
}),
Oracle::OutputContains(needle) => {
let nb = needle.as_bytes();
@ -721,6 +757,7 @@ fn is_cross_cutting(pred: &ProbePredicate) -> bool {
| ProbePredicate::WeakKeyEntropy { .. }
| ProbePredicate::IdorBoundaryCrossed
| ProbePredicate::OutboundHostNotIn { .. }
| ProbePredicate::JsonParseExcessiveDepth { .. }
)
}
@ -1007,6 +1044,25 @@ fn probes_satisfy_outbound_off_list(probes: &[SinkProbe], allowlist: &[&str]) ->
})
}
/// True when at least one drained probe is a
/// [`ProbeKind::JsonParse`] record whose `depth > max_depth` OR whose
/// `excessive_depth` flag is set. Powers
/// [`ProbePredicate::JsonParseExcessiveDepth`] (Phase 11 — Track J.9).
///
/// `excessive_depth` short-circuits — a shim that already caught the
/// parser's own recursion-limit signal can emit
/// `JsonParse { depth: 0, excessive_depth: true }` without counting
/// nesting manually and still trip the predicate.
fn probes_satisfy_json_parse_excessive(probes: &[SinkProbe], max_depth: u32) -> bool {
probes.iter().any(|p| match &p.kind {
ProbeKind::JsonParse {
depth,
excessive_depth,
} => *excessive_depth || *depth > max_depth,
_ => false,
})
}
/// Returns `true` when `location` redirects to a host that is neither
/// `request_host` nor any entry of `allowlist`. Crate-visible so the
/// in-crate predicate above and the colocated tests can share one
@ -1117,7 +1173,8 @@ fn probe_satisfies_one(probe: &SinkProbe, pred: &ProbePredicate) -> bool {
| ProbePredicate::PrototypeCanaryTouched { .. }
| ProbePredicate::WeakKeyEntropy { .. }
| ProbePredicate::IdorBoundaryCrossed
| ProbePredicate::OutboundHostNotIn { .. } => true,
| ProbePredicate::OutboundHostNotIn { .. }
| ProbePredicate::JsonParseExcessiveDepth { .. } => true,
}
}
@ -1150,7 +1207,8 @@ pub fn probe_crash_signal(probe: &SinkProbe) -> Option<Signal> {
| ProbeKind::PrototypePollution { .. }
| ProbeKind::WeakKey { .. }
| ProbeKind::IdorAccess { .. }
| ProbeKind::OutboundNetwork { .. } => None,
| ProbeKind::OutboundNetwork { .. }
| ProbeKind::JsonParse { .. } => None,
}
}
@ -1724,4 +1782,60 @@ mod tests {
};
assert!(!oracle_fired(&oracle, &o, &[]));
}
fn json_parse_probe(depth: u32, excessive_depth: bool) -> SinkProbe {
SinkProbe {
sink_callee: "json.loads".into(),
args: vec![],
captured_at_ns: 1,
payload_id: "phase11-json".into(),
kind: ProbeKind::JsonParse {
depth,
excessive_depth,
},
witness: ProbeWitness::empty(),
}
}
#[test]
fn json_parse_excessive_depth_fires_when_depth_exceeds_budget() {
let oracle = Oracle::SinkProbe {
predicates: &[ProbePredicate::JsonParseExcessiveDepth { max_depth: 64 }],
};
let probes = vec![json_parse_probe(512, false)];
assert!(oracle_fired(&oracle, &outcome(), &probes));
}
#[test]
fn json_parse_excessive_depth_fires_on_short_circuit_flag_even_with_zero_depth() {
let oracle = Oracle::SinkProbe {
predicates: &[ProbePredicate::JsonParseExcessiveDepth { max_depth: 64 }],
};
// Shim caught the parser's own recursion limit and emitted
// `excessive_depth: true` without counting nesting — predicate
// should still fire.
let probes = vec![json_parse_probe(0, true)];
assert!(oracle_fired(&oracle, &outcome(), &probes));
}
#[test]
fn json_parse_excessive_depth_clears_when_depth_within_budget() {
let oracle = Oracle::SinkProbe {
predicates: &[ProbePredicate::JsonParseExcessiveDepth { max_depth: 64 }],
};
// Benign control: shallowly nested object.
let probes = vec![json_parse_probe(3, false)];
assert!(!oracle_fired(&oracle, &outcome(), &probes));
}
#[test]
fn json_parse_excessive_depth_ignores_unrelated_probe_kinds() {
let oracle = Oracle::SinkProbe {
predicates: &[ProbePredicate::JsonParseExcessiveDepth { max_depth: 64 }],
};
// A HeaderEmit probe (different kind) must not satisfy the
// predicate even if the shim emitted both for the same payload.
let probes = vec![header_emit_probe("Set-Cookie", "noise")];
assert!(!oracle_fired(&oracle, &outcome(), &probes));
}
}

View file

@ -378,6 +378,38 @@ pub enum ProbeKind {
/// case-insensitively against the allowlist entries.
host: String,
},
/// Phase 11 (Track J.9) JSON_PARSE depth observation. Stamped by
/// the per-language harness shim's instrumented JSON parser
/// (`json.loads` / `JSON.parse` / `Jackson.readTree` / `serde_json`
/// / `Yajl::Parser` / etc.) when the attacker-controlled payload
/// is decoded. `depth` records the maximum nesting depth observed
/// during parsing; the
/// [`crate::dynamic::oracle::ProbePredicate::JsonParseExcessiveDepth`]
/// predicate fires when `depth > max_depth` — the canonical
/// JSON-parser depth-bomb / stack-exhaustion shape.
///
/// `excessive_depth` is a pre-computed hint the shim sets when it
/// already knows the parser tripped a configured depth limit
/// (e.g. the parser raised on `RECURSION_LIMIT`). The oracle's
/// predicate consults `depth` directly so the hint is informational
/// — it lets host-side tooling render the probe without re-deriving
/// the verdict. Per-shim implementations may emit `depth = 0` when
/// the recursion budget tripped and the actual depth was not
/// counted; in that case `excessive_depth: true` is the load-bearing
/// field.
JsonParse {
/// Maximum nesting depth observed during the parse. Zero is
/// legal (flat JSON like `[]` or `"x"`). The oracle compares
/// against `ProbePredicate::JsonParseExcessiveDepth::max_depth`.
depth: u32,
/// Pre-computed flag set by the shim when the parser already
/// reported an excessive-depth condition (e.g. CPython's
/// `RecursionError`). The predicate fires on either
/// `depth > max_depth` OR `excessive_depth = true`, so a shim
/// that catches the parser's own limit signal can short-circuit
/// without counting nesting manually.
excessive_depth: bool,
},
}
/// Bounded forensic snapshot captured alongside a [`SinkProbe`]
@ -767,6 +799,69 @@ mod tests {
assert!(matches!(round.kind, ProbeKind::HeaderWireFrame { .. }));
}
#[test]
fn probe_kind_json_parse_round_trips_through_channel() {
let dir = TempDir::new().unwrap();
let ch = ProbeChannel::for_workdir(dir.path()).unwrap();
let mut p = sample_probe("json-depth");
p.kind = ProbeKind::JsonParse {
depth: 512,
excessive_depth: true,
};
ch.write(&p).unwrap();
let drained = ch.drain();
assert_eq!(drained.len(), 1);
match &drained[0].kind {
ProbeKind::JsonParse {
depth,
excessive_depth,
} => {
assert_eq!(*depth, 512);
assert!(*excessive_depth);
}
other => panic!("expected JsonParse, got {other:?}"),
}
}
#[test]
fn probe_kind_json_parse_serdes_with_explicit_tag() {
let p = SinkProbe {
sink_callee: "json.loads".into(),
args: vec![],
captured_at_ns: 1,
payload_id: "json-1".into(),
kind: ProbeKind::JsonParse {
depth: 7,
excessive_depth: false,
},
witness: ProbeWitness::empty(),
};
let json = serde_json::to_string(&p).unwrap();
assert!(
json.contains(r#""kind":"JsonParse""#),
"kind tag must round-trip: {json}",
);
assert!(
json.contains(r#""depth":7"#),
"depth field must round-trip: {json}",
);
assert!(
json.contains(r#""excessive_depth":false"#),
"excessive_depth field must round-trip: {json}",
);
let round: SinkProbe = serde_json::from_str(&json).unwrap();
match round.kind {
ProbeKind::JsonParse {
depth,
excessive_depth,
} => {
assert_eq!(depth, 7);
assert!(!excessive_depth);
}
other => panic!("expected JsonParse after round-trip, got {other:?}"),
}
}
#[test]
fn witness_from_inputs_redacts_and_truncates() {
let huge_payload = vec![0xAB; policy::PAYLOAD_CAPTURE_LIMIT_BYTES * 2];

View file

@ -0,0 +1,37 @@
# Phase 08 (Track J.6) — Python raw-socket HEADER_INJECTION vuln fixture.
#
# Writes the response status line and headers directly to the wire via
# `self.wfile.write`, bypassing the framework-level CRLF validator that
# werkzeug / Flask / axum / Tomcat would otherwise interpose. A payload
# carrying `\r\nSet-Cookie: ...` splits the single Set-Cookie header
# into two on the wire, producing the canonical smuggled-second-header
# shape that `ProbeKind::HeaderWireFrame` is designed to catch.
#
# The harness (`src/dynamic/lang/python.rs::emit_header_injection_harness`)
# detects the `BaseHTTPRequestHandler` import in this file and routes
# through the tier-(b) wire-frame branch: boot `HTTPServer` on a
# loopback port, issue one `GET /` over a raw socket, read the bytes
# the handler wrote to the response socket, and emit them as a
# `ProbeKind::HeaderWireFrame` record.
from http.server import BaseHTTPRequestHandler
class VulnHandler(BaseHTTPRequestHandler):
# Set by the harness before each request. Bytes go straight onto
# the wire with no encoding pass.
cookie_value: bytes = b""
def do_GET(self):
body = b"ok\n"
raw = (
b"HTTP/1.0 200 OK\r\n"
b"Content-Length: " + str(len(body)).encode("ascii") + b"\r\n"
b"Set-Cookie: " + self.__class__.cookie_value + b"\r\n"
b"\r\n"
) + body
self.wfile.write(raw)
def log_message(self, *args, **kwargs):
# Silence default stderr logging so the harness captures only
# the probe + sink-hit sentinel.
return

View file

@ -682,4 +682,85 @@ mod e2e_phase_08 {
};
assert_confirmed(Lang::Rust, &outcome);
}
// Phase 08 tier-(b): Python raw-socket wire-frame fixture.
// `tests/dynamic_fixtures/header_injection/python_raw/vuln.py` boots
// a `BaseHTTPRequestHandler` writing raw bytes via `self.wfile.write`,
// bypassing werkzeug's CRLF strip. The harness boots the handler on a
// loopback port, reads the response-header block off the socket, and
// emits a `ProbeKind::HeaderWireFrame` record. Asserts the test
// exercises the wire-frame branch (not the synthetic fallback) by
// pinning `wire_frame_len` in the captured stdout — that literal only
// appears in the tier-(b) write path.
fn build_python_raw_spec(entry_name: &str) -> (HarnessSpec, TempDir) {
let fixture_src = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/dynamic_fixtures/header_injection/python_raw/vuln.py");
let tmp = TempDir::new().expect("create tempdir");
let dst = tmp.path().join("vuln.py");
std::fs::copy(&fixture_src, &dst).expect("copy python_raw fixture into tempdir");
let entry_file = dst.to_string_lossy().into_owned();
let mut digest = blake3::Hasher::new();
digest.update(b"phase08-e2e-header-injection|python_raw|vuln.py");
let spec_hash = format!("{:016x}", {
let bytes = digest.finalize();
u64::from_le_bytes(bytes.as_bytes()[..8].try_into().unwrap())
});
let spec = HarnessSpec {
finding_id: spec_hash.clone(),
entry_file: entry_file.clone(),
entry_name: entry_name.to_owned(),
entry_kind: EntryKind::Function,
lang: Lang::Python,
toolchain_id: default_toolchain_id(Lang::Python).into(),
payload_slot: PayloadSlot::Param(0),
expected_cap: Cap::HEADER_INJECTION,
constraint_hints: vec![],
sink_file: entry_file,
sink_line: 1,
spec_hash: spec_hash.clone(),
derivation: SpecDerivationStrategy::FromFlowSteps,
stubs_required: vec![],
framework: None,
java_toolchain: nyx_scanner::dynamic::spec::JavaToolchain::default(),
};
(spec, tmp)
}
#[test]
fn python_raw_socket_vuln_confirms_via_wire_frame_probe() {
if !command_available("python3") {
eprintln!("SKIP python_raw: missing python3");
return;
}
let _guard = FIXTURE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let (spec, _tmp) = build_python_raw_spec("run");
let opts = SandboxOptions {
backend: SandboxBackend::Process,
..SandboxOptions::default()
};
let outcome = match run_spec(&spec, &opts) {
Ok(outcome) => outcome,
Err(RunError::BuildFailed { stderr, attempts }) => {
eprintln!(
"SKIP python_raw: harness build failed after {attempts} attempts: {stderr}",
);
return;
}
Err(e) => panic!("run_spec(python_raw) errored: {e:?}"),
};
assert_confirmed(Lang::Python, &outcome);
let any_wire_frame_marker = outcome.attempts.iter().any(|a| {
String::from_utf8_lossy(&a.outcome.stdout).contains("wire_frame_len")
});
assert!(
any_wire_frame_marker,
"python_raw fixture must exercise the tier-(b) wire-frame harness branch; \
expected `wire_frame_len` substring in at least one attempt's stdout, got attempts={:?}",
outcome
.attempts
.iter()
.map(|a| String::from_utf8_lossy(&a.outcome.stdout).into_owned())
.collect::<Vec<_>>(),
);
}
}