[pitboss] phase 06: Track J.4 + Track L.4 — LDAP_INJECTION corpus + LdapTemplate / python-ldap / php-ldap adapters

This commit is contained in:
pitboss 2026-05-17 22:32:44 -05:00
parent 993bfabe28
commit b2eeaabb09
27 changed files with 2189 additions and 18 deletions

View file

@ -618,6 +618,17 @@ pub fn emit(spec: &HarnessSpec) -> Result<HarnessSource, UnsupportedReason> {
return Ok(emit_xxe_harness(spec));
}
// Phase 06 (Track J.4): short-circuit to the LDAP harness when the
// spec's expected cap is LDAP_INJECTION. The harness splices the
// payload into a `(uid=<payload>)` filter and applies the
// [`crate::dynamic::stubs::ldap_server`] RFC-4515 subset against
// the same three provisioned users; the resulting count drives a
// `ProbeKind::Ldap` probe consumed by the
// `LdapResultCountGreaterThan` oracle.
if spec.expected_cap == crate::labels::Cap::LDAP_INJECTION {
return Ok(emit_ldap_harness(spec));
}
let entry_source = read_entry_source(&spec.entry_file);
let shape = PythonShape::detect(spec, &entry_source);
let body = generate_for_shape(spec, shape);
@ -839,6 +850,140 @@ if __name__ == "__main__":
}
}
/// Phase 06 — Track J.4 LDAP-injection harness for Python
/// (`ldap.search_s`).
///
/// Reads `NYX_PAYLOAD`, splices it into a `(uid=<payload>)` filter,
/// evaluates the filter against the in-sandbox LDAP directory (three
/// users: `alice`, `bob`, `carol`) using the same RFC-4515 subset the
/// [`crate::dynamic::stubs::ldap_server`] stub implements, and writes
/// a `ProbeKind::Ldap { entries_returned }` probe whose `n` is the
/// count the directory returned. Mirrors the synthetic-harness
/// pattern used by Phase 03 / 04 / 05.
pub fn emit_ldap_harness(_spec: &HarnessSpec) -> HarnessSource {
let probe = probe_shim();
let body = format!(
r#"#!/usr/bin/env python3
"""Nyx dynamic harness — LDAP_INJECTION ldap.search_s (Phase 06 / Track J.4)."""
import os, json, sys, time
{probe}
_NYX_LDAP_USERS = ["alice", "bob", "carol"]
def _nyx_attr_match(pattern, uid):
if pattern == "*":
return True
if "*" in pattern:
prefix, _, suffix = pattern.partition("*")
return uid.startswith(prefix) and uid.endswith(suffix)
return pattern == uid
def _nyx_split_clauses(src):
out = []
i = 0
n = len(src)
while i < n:
if src[i] != "(":
i += 1
continue
depth = 0
start = i
while i < n:
c = src[i]
if c == "(":
depth += 1
elif c == ")":
depth -= 1
if depth == 0:
i += 1
break
i += 1
out.append(src[start:i])
return out
def _nyx_inner_has_break(inner):
depth = 0
for c in inner:
if c == "(":
depth += 1
elif c == ")":
depth -= 1
if depth < 0:
return True
return False
def _nyx_match_one(filt, uid):
f = filt.strip()
if not (f.startswith("(") and f.endswith(")")):
return True
inner = f[1:-1]
if _nyx_inner_has_break(inner):
return True
if inner.startswith("&") or inner.startswith("|"):
clauses = _nyx_split_clauses(inner[1:])
if not clauses:
return False
results = [_nyx_match_one(c, uid) for c in clauses]
return all(results) if inner.startswith("&") else any(results)
if "=" not in inner:
return True
attr, _, pattern = inner.partition("=")
if attr.lower() not in ("uid", "cn"):
return True
return _nyx_attr_match(pattern, uid)
def _nyx_ldap_count(filt):
f = (filt or "").strip()
if not f:
return 0
if not (f.startswith("(") and f.endswith(")")):
return len(_NYX_LDAP_USERS)
if _nyx_inner_has_break(f[1:-1]):
return len(_NYX_LDAP_USERS)
return sum(1 for u in _NYX_LDAP_USERS if _nyx_match_one(f, u))
def _nyx_ldap_probe(filt, entries_returned):
rec = {{
"sink_callee": "ldap.search_s",
"args": [{{"kind": "String", "value": filt}}],
"captured_at_ns": time.time_ns(),
"payload_id": os.environ.get("NYX_PAYLOAD_ID", ""),
"kind": {{"kind": "Ldap", "entries_returned": int(entries_returned)}},
"witness": __nyx_witness("ldap.search_s", [filt]),
}}
__nyx_emit(rec)
def _nyx_run():
payload = os.environ.get("NYX_PAYLOAD", "")
filt = "(uid=" + payload + ")"
count = _nyx_ldap_count(filt)
_nyx_ldap_probe(filt, count)
print("__NYX_SINK_HIT__", flush=True)
sys.stdout.write(json.dumps({{"filter": filt, "entries_returned": count}}) + "\n")
sys.stdout.flush()
if __name__ == "__main__":
_nyx_run()
"#
);
HarnessSource {
source: body,
filename: "harness.py".to_owned(),
command: vec!["python3".to_owned(), "harness.py".to_owned()],
extra_files: Vec::new(),
entry_subpath: None,
}
}
/// Public wrapper to detect the shape for a finalised `HarnessSpec`,
/// reading the entry file from disk. Exposed so test helpers can pin a
/// per-fixture shape without round-tripping through [`emit`].