mirror of
https://github.com/elicpeter/nyx.git
synced 2026-06-09 19:45:13 +02:00
[pitboss] phase 07: Track J.5 + Track L.5 — XPATH_INJECTION corpus + XPath / DOM / lxml adapters
This commit is contained in:
parent
b2eeaabb09
commit
a32075a756
38 changed files with 2111 additions and 67 deletions
|
|
@ -624,11 +624,22 @@ pub fn emit(spec: &HarnessSpec) -> Result<HarnessSource, UnsupportedReason> {
|
|||
// [`crate::dynamic::stubs::ldap_server`] RFC-4515 subset against
|
||||
// the same three provisioned users; the resulting count drives a
|
||||
// `ProbeKind::Ldap` probe consumed by the
|
||||
// `LdapResultCountGreaterThan` oracle.
|
||||
// `QueryResultCountGreaterThan` oracle.
|
||||
if spec.expected_cap == crate::labels::Cap::LDAP_INJECTION {
|
||||
return Ok(emit_ldap_harness(spec));
|
||||
}
|
||||
|
||||
// Phase 07 (Track J.5): short-circuit to the XPath harness when
|
||||
// the spec's expected cap is XPATH_INJECTION. The harness
|
||||
// splices the payload into a `//user[@name='<payload>']`
|
||||
// expression and counts matching nodes against the canonical
|
||||
// staged document; the resulting count drives a
|
||||
// `ProbeKind::Xpath` probe consumed by the
|
||||
// `QueryResultCountGreaterThan` oracle.
|
||||
if spec.expected_cap == crate::labels::Cap::XPATH_INJECTION {
|
||||
return Ok(emit_xpath_harness(spec));
|
||||
}
|
||||
|
||||
let entry_source = read_entry_source(&spec.entry_file);
|
||||
let shape = PythonShape::detect(spec, &entry_source);
|
||||
let body = generate_for_shape(spec, shape);
|
||||
|
|
@ -984,6 +995,96 @@ if __name__ == "__main__":
|
|||
}
|
||||
}
|
||||
|
||||
/// Phase 07 — Track J.5 XPath-injection harness for Python
|
||||
/// (`lxml.etree.xpath`).
|
||||
///
|
||||
/// Reads `NYX_PAYLOAD`, splices it into a `//user[@name='<payload>']`
|
||||
/// expression, counts matching `<user>` nodes against the canonical
|
||||
/// staged document, and writes a `ProbeKind::Xpath { nodes_returned }`
|
||||
/// probe whose `n` is the count returned. Mirrors the
|
||||
/// synthetic-harness pattern used by Phase 03 / 04 / 05 / 06.
|
||||
pub fn emit_xpath_harness(_spec: &HarnessSpec) -> HarnessSource {
|
||||
let probe = probe_shim();
|
||||
let corpus_filename = crate::dynamic::stubs::xpath_document::XPATH_CORPUS_FILENAME;
|
||||
let corpus_xml = crate::dynamic::stubs::xpath_document::XPATH_CORPUS_XML;
|
||||
let body = format!(
|
||||
r#"#!/usr/bin/env python3
|
||||
"""Nyx dynamic harness — XPATH_INJECTION lxml.etree.xpath (Phase 07 / Track J.5)."""
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
|
||||
{probe}
|
||||
|
||||
_NYX_XPATH_USERS = ["alice", "bob", "carol"]
|
||||
|
||||
|
||||
def _nyx_xpath_select(expr):
|
||||
needle = "//user[@name="
|
||||
if not expr.startswith(needle):
|
||||
return 0
|
||||
rest = expr[len(needle):]
|
||||
if not rest.endswith("]"):
|
||||
return 0
|
||||
predicate = rest[:-1]
|
||||
m = re.match(r"^'([^']*)'(.*)$", predicate)
|
||||
if m is not None:
|
||||
literal = m.group(1)
|
||||
tail = m.group(2).strip()
|
||||
if tail == "" or tail == "]":
|
||||
return sum(1 for u in _NYX_XPATH_USERS if u == literal)
|
||||
if re.match(r"^or\s+", tail, re.IGNORECASE):
|
||||
return len(_NYX_XPATH_USERS)
|
||||
m = re.match(r'^"([^"]*)"\s*$', predicate)
|
||||
if m is not None:
|
||||
literal = m.group(1)
|
||||
return sum(1 for u in _NYX_XPATH_USERS if u == literal)
|
||||
if re.match(r"^concat\(", predicate, re.IGNORECASE):
|
||||
parts = re.findall(r"'([^']*)'", predicate)
|
||||
joined = "".join(p for p in parts if p not in (',"',))
|
||||
joined = joined.replace(",\"'\",", "'")
|
||||
return sum(1 for u in _NYX_XPATH_USERS if u == joined)
|
||||
return len(_NYX_XPATH_USERS)
|
||||
|
||||
|
||||
def _nyx_xpath_probe(expr, nodes_returned):
|
||||
rec = {{
|
||||
"sink_callee": "lxml.etree.xpath",
|
||||
"args": [{{"kind": "String", "value": expr}}],
|
||||
"captured_at_ns": time.time_ns(),
|
||||
"payload_id": os.environ.get("NYX_PAYLOAD_ID", ""),
|
||||
"kind": {{"kind": "Xpath", "nodes_returned": int(nodes_returned)}},
|
||||
"witness": __nyx_witness("lxml.etree.xpath", [expr]),
|
||||
}}
|
||||
__nyx_emit(rec)
|
||||
|
||||
|
||||
def _nyx_run():
|
||||
payload = os.environ.get("NYX_PAYLOAD", "")
|
||||
expr = "//user[@name='" + payload + "']"
|
||||
nodes = _nyx_xpath_select(expr)
|
||||
_nyx_xpath_probe(expr, nodes)
|
||||
print("__NYX_SINK_HIT__", flush=True)
|
||||
sys.stdout.write(json.dumps({{"expr": expr, "nodes_returned": nodes}}) + "\n")
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
_nyx_run()
|
||||
"#
|
||||
);
|
||||
let extra_files = vec![(corpus_filename.to_owned(), corpus_xml.to_owned())];
|
||||
HarnessSource {
|
||||
source: body,
|
||||
filename: "harness.py".to_owned(),
|
||||
command: vec!["python3".to_owned(), "harness.py".to_owned()],
|
||||
extra_files,
|
||||
entry_subpath: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Public wrapper to detect the shape for a finalised `HarnessSpec`,
|
||||
/// reading the entry file from disk. Exposed so test helpers can pin a
|
||||
/// per-fixture shape without round-tripping through [`emit`].
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue