[pitboss/grind] deferred session-0015 (20260521T201327Z-3848)

This commit is contained in:
pitboss 2026-05-21 19:28:23 -05:00
parent c95dcc00fb
commit 3b62269a04
2 changed files with 539 additions and 11 deletions

View file

@ -1889,12 +1889,93 @@ fn derive_module_name(entry_file: &str) -> String {
/// pre-encoded via `urllib.parse.quote`, so the captured value
/// carries `%0D%0A` (not the raw bytes) and the predicate stays
/// clear.
pub fn emit_header_injection_harness(_spec: &HarnessSpec) -> HarnessSource {
pub fn emit_header_injection_harness(spec: &HarnessSpec) -> HarnessSource {
let probe = probe_shim();
let entry_source = read_entry_source(&spec.entry_file);
let module_name = derive_module_name(&spec.entry_file);
let entry_name = if spec.entry_name.is_empty() {
"run".to_owned()
} else {
spec.entry_name.clone()
};
let uses_flask = entry_source.contains("from flask")
|| entry_source.contains("import flask")
|| entry_source.contains("werkzeug.wrappers");
let via_fixture = if uses_flask {
format!(
r#"def _nyx_header_via_fixture(payload):
# Phase 08 tier-(a): import the fixture, monkey-patch
# `werkzeug.datastructures.Headers.__setitem__` to capture every
# name/value pair the fixture writes *before* werkzeug's strict
# CRLF validator runs. This mirrors the Java permissive servlet
# stub at `src/dynamic/lang/java_servlet_stubs.rs::http_servlet_response`,
# so a vuln payload with raw `\r\n` is recorded verbatim and a
# benign control whose bytes are URL-encoded is recorded
# URL-encoded. Returns `None` when werkzeug is missing or the
# fixture cannot be imported so the caller can fall back to the
# inline synthetic probe.
try:
import werkzeug.datastructures as _wzd
except Exception:
return None
captured = []
_orig_setitem = _wzd.Headers.__setitem__
def _nyx_setitem(self, key, value):
try:
captured.append((str(key), str(value)))
except Exception:
pass
try:
_orig_setitem(self, key, value)
except Exception:
# werkzeug>=2.x rejects CRLF in header values. Swallow
# the validator's exception so the captor still records
# the would-have-been-written bytes.
pass
_wzd.Headers.__setitem__ = _nyx_setitem
sys.path.insert(0, ".")
try:
try:
mod = importlib.import_module("{module_name}")
except Exception:
return None
fn = getattr(mod, "{entry_name}", None)
if fn is None:
return None
try:
fn(payload)
except Exception:
# Fixture itself raised (validator path, missing dep, etc.)
# return whatever the captor recorded before the throw.
pass
return captured
finally:
_wzd.Headers.__setitem__ = _orig_setitem
"#
)
} else {
String::new()
};
let invoke_via_fixture = if uses_flask {
r#" captured = _nyx_header_via_fixture(payload)
if captured:
for name, value in captured:
_nyx_header_probe(name, value)
print("__NYX_SINK_HIT__", flush=True)
sys.stdout.write(json.dumps({"headers": [list(p) for p in captured]}) + "\n")
sys.stdout.flush()
return
"#
} else {
""
};
let importlib_import = if uses_flask { "import importlib\n" } else { "" };
let body = format!(
r#"#!/usr/bin/env python3
"""Nyx dynamic harness — HEADER_INJECTION flask.Response.headers.__setitem__ (Phase 08 / Track J.6)."""
import json
{importlib_import}import json
import os
import sys
import time
@ -1917,9 +1998,9 @@ def _nyx_header_probe(name, value):
__nyx_emit(rec)
def _nyx_run():
{via_fixture}def _nyx_run():
payload = os.environ.get("NYX_PAYLOAD", "")
# Synthetic instrumented setter mirrors
{invoke_via_fixture} # Synthetic fallback mirrors
# `werkzeug.datastructures.Headers.__setitem__` semantics: the
# value bytes flow through unmodified, so a tainted payload that
# carries raw `\r\n` lands on the wire as a header split.
@ -1954,12 +2035,74 @@ if __name__ == "__main__":
/// [`crate::dynamic::oracle::ProbePredicate::RedirectHostNotIn`]
/// oracle; the paired benign control redirects to a same-origin
/// path and leaves the predicate clear.
pub fn emit_open_redirect_harness(_spec: &HarnessSpec) -> HarnessSource {
pub fn emit_open_redirect_harness(spec: &HarnessSpec) -> HarnessSource {
let probe = probe_shim();
let entry_source = read_entry_source(&spec.entry_file);
let module_name = derive_module_name(&spec.entry_file);
let entry_name = if spec.entry_name.is_empty() {
"run".to_owned()
} else {
spec.entry_name.clone()
};
let uses_flask =
entry_source.contains("from flask") || entry_source.contains("import flask");
let via_fixture = if uses_flask {
format!(
r#"def _nyx_redirect_via_fixture(payload):
# Phase 09 tier-(a): import the fixture, call its `{entry_name}` so
# the real `flask.redirect` runs, then read the bound `Location:`
# header off the returned response. Returns `(location, request_host)`
# on success, or `None` when the import / call fails so the caller
# can fall back to the inline synthetic probe.
sys.path.insert(0, ".")
try:
mod = importlib.import_module("{module_name}")
except Exception:
return None
fn = getattr(mod, "{entry_name}", None)
if fn is None:
return None
try:
response = fn(payload)
except Exception:
# Fixture raised (validator path, missing dep, etc.) drop
# tier-(a) and let the caller fall back.
return None
try:
location = response.headers.get("Location", "")
except Exception:
return None
if not isinstance(location, str):
try:
location = str(location)
except Exception:
return None
return (location, "example.com")
"#
)
} else {
String::new()
};
let invoke_via_fixture = if uses_flask {
r#" captured = _nyx_redirect_via_fixture(payload)
if captured is not None:
location, request_host = captured
_nyx_redirect_probe(location, request_host)
print("__NYX_SINK_HIT__", flush=True)
sys.stdout.write(json.dumps({"location": location, "request_host": request_host}) + "\n")
sys.stdout.flush()
return
"#
} else {
""
};
let importlib_import = if uses_flask { "import importlib\n" } else { "" };
let body = format!(
r#"#!/usr/bin/env python3
"""Nyx dynamic harness — OPEN_REDIRECT flask.redirect (Phase 09 / Track J.7)."""
import json
{importlib_import}import json
import os
import sys
import time
@ -1985,9 +2128,9 @@ def _nyx_redirect_probe(location, request_host):
__nyx_emit(rec)
def _nyx_run():
{via_fixture}def _nyx_run():
payload = os.environ.get("NYX_PAYLOAD", "")
request_host = "example.com"
{invoke_via_fixture} request_host = "example.com"
location = payload
_nyx_redirect_probe(location, request_host)
print("__NYX_SINK_HIT__", flush=True)
@ -3049,4 +3192,209 @@ mod tests {
"module name must come from the entry-file stem, not a hard-coded literal",
);
}
fn make_header_spec(entry_file: &str, entry_name: &str) -> HarnessSpec {
let mut spec = make_spec(PayloadSlot::Param(0));
spec.expected_cap = Cap::HEADER_INJECTION;
spec.entry_file = entry_file.to_owned();
spec.entry_name = entry_name.to_owned();
spec
}
#[test]
fn emit_header_injection_harness_routes_through_fixture_when_flask_imported() {
let dir = std::env::temp_dir().join("nyx_phase08_py_test_drive_fixture");
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
let entry = dir.join("vuln.py");
std::fs::write(
&entry,
"from flask import Response\n\
def run(value):\n response = Response('ok')\n response.headers['Set-Cookie'] = value\n return response\n",
)
.unwrap();
let h = emit_header_injection_harness(&make_header_spec(
entry.to_str().unwrap(),
"run",
));
assert!(
h.source.contains("def _nyx_header_via_fixture(payload):"),
"tier-(a) harness must define the fixture-routing helper: {}",
h.source
);
assert!(
h.source.contains("import werkzeug.datastructures"),
"tier-(a) harness must monkey-patch werkzeug Headers: {}",
h.source
);
assert!(
h.source.contains("_wzd.Headers.__setitem__ = _nyx_setitem"),
"tier-(a) harness must install the permissive captor: {}",
h.source
);
assert!(
h.source.contains("importlib.import_module(\"vuln\")"),
"tier-(a) harness must import the fixture by its file stem: {}",
h.source
);
assert!(
h.source.contains("getattr(mod, \"run\", None)"),
"tier-(a) harness must look up the named entry function: {}",
h.source
);
assert!(
h.source.contains("captured = _nyx_header_via_fixture(payload)"),
"harness main must call the fixture-routing helper first: {}",
h.source
);
assert!(
h.source
.contains("_nyx_header_probe(\"Set-Cookie\", payload)")
|| h.source.contains("value = payload\n _nyx_header_probe(name, value)"),
"fallback path must still emit a synthetic probe: {}",
h.source
);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn emit_header_injection_harness_falls_back_when_flask_not_imported() {
let dir = std::env::temp_dir().join("nyx_phase08_py_test_no_flask");
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
let entry = dir.join("vuln.py");
std::fs::write(
&entry,
"def run(value):\n return value\n",
)
.unwrap();
let h = emit_header_injection_harness(&make_header_spec(
entry.to_str().unwrap(),
"run",
));
assert!(
!h.source.contains("import werkzeug.datastructures"),
"fallback path must not import werkzeug: {}",
h.source
);
assert!(
!h.source.contains("def _nyx_header_via_fixture"),
"fallback path must not define the fixture-routing helper: {}",
h.source
);
assert!(
h.source.contains("value = payload\n _nyx_header_probe(name, value)"),
"fallback path must keep the synthetic probe: {}",
h.source
);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn emit_header_injection_harness_derives_module_name_from_entry_file() {
let dir = std::env::temp_dir().join("nyx_phase08_py_test_module_derive");
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
let entry = dir.join("benign.py");
std::fs::write(
&entry,
"from flask import Response\n\
def run(v):\n return Response('ok')\n",
)
.unwrap();
let h = emit_header_injection_harness(&make_header_spec(
entry.to_str().unwrap(),
"run",
));
assert!(
h.source.contains("importlib.import_module(\"benign\")"),
"module name must come from the entry-file stem: {}",
h.source
);
let _ = std::fs::remove_dir_all(&dir);
}
fn make_redirect_spec(entry_file: &str, entry_name: &str) -> HarnessSpec {
let mut spec = make_spec(PayloadSlot::Param(0));
spec.expected_cap = Cap::OPEN_REDIRECT;
spec.entry_file = entry_file.to_owned();
spec.entry_name = entry_name.to_owned();
spec
}
#[test]
fn emit_open_redirect_harness_routes_through_fixture_when_flask_imported() {
let dir = std::env::temp_dir().join("nyx_phase09_py_test_drive_fixture");
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
let entry = dir.join("vuln.py");
std::fs::write(
&entry,
"from flask import redirect\ndef run(value):\n return redirect(value)\n",
)
.unwrap();
let h = emit_open_redirect_harness(&make_redirect_spec(
entry.to_str().unwrap(),
"run",
));
assert!(
h.source.contains("def _nyx_redirect_via_fixture(payload):"),
"tier-(a) harness must define the fixture-routing helper: {}",
h.source
);
assert!(
h.source.contains("importlib.import_module(\"vuln\")"),
"tier-(a) harness must import the fixture by its file stem: {}",
h.source
);
assert!(
h.source.contains("response.headers.get(\"Location\", \"\")"),
"tier-(a) harness must read the Location header off the returned response: {}",
h.source
);
assert!(
h.source.contains("captured = _nyx_redirect_via_fixture(payload)"),
"harness main must call the fixture-routing helper first: {}",
h.source
);
assert!(
h.source.contains("location = payload\n _nyx_redirect_probe(location, request_host)"),
"fallback path must keep the synthetic probe: {}",
h.source
);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn emit_open_redirect_harness_falls_back_when_flask_not_imported() {
let dir = std::env::temp_dir().join("nyx_phase09_py_test_no_flask");
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
let entry = dir.join("vuln.py");
std::fs::write(
&entry,
"def run(value):\n return value\n",
)
.unwrap();
let h = emit_open_redirect_harness(&make_redirect_spec(
entry.to_str().unwrap(),
"run",
));
assert!(
!h.source.contains("def _nyx_redirect_via_fixture"),
"fallback path must not define the fixture-routing helper: {}",
h.source
);
assert!(
!h.source.contains("importlib.import_module"),
"fallback path must not import the fixture: {}",
h.source
);
assert!(
h.source.contains("location = payload\n _nyx_redirect_probe(location, request_host)"),
"fallback path must keep the synthetic probe: {}",
h.source
);
let _ = std::fs::remove_dir_all(&dir);
}
}