nyx/scripts/corpus_dashboard.py

285 lines
12 KiB
Python
Executable file

#!/usr/bin/env python3
"""Corpus health report for src/dynamic/corpus.rs.
Produces:
- Per-cap coverage table (payload count, benign controls, OOB slots)
- Per-payload last-confirmed timestamp (from repro artifacts if present)
- CVE reference count
- Marker collision audit
Exit code 0 = healthy. Non-zero = collision or missing coverage.
Usage:
python3 scripts/corpus_dashboard.py [--repro-dir REPRO_DIR] [--json]
"""
import argparse
import json
import os
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
# ── Payload table (mirrors src/dynamic/corpus.rs) ────────────────────────────
# Manually synced; CI should flag drift via cargo test no_marker_collisions.
CORPUS_VERSION = 3
@dataclass
class PayloadEntry:
cap: str
label: str
bytes_repr: str
oracle_kind: str
oracle_value: Optional[str]
is_benign: bool
provenance: str
since_corpus_version: int
deprecated_at_corpus_version: Optional[int]
fixture_paths: list[str]
oob_nonce_slot: bool
cve_refs: list[str] = field(default_factory=list)
PAYLOADS: list[PayloadEntry] = [
# ── SQL_QUERY ──────────────────────────────────────────────────────────────
PayloadEntry(
cap="SQL_QUERY", label="sqli-tautology",
bytes_repr="' OR '1'='1", oracle_kind="OutputContains",
oracle_value="NYX_SQL_CONFIRMED", is_benign=False,
provenance="Curated", since_corpus_version=1,
deprecated_at_corpus_version=None,
fixture_paths=["tests/benchmark/corpus/rust/sqli/sqli_rusqlite_format.rs"],
oob_nonce_slot=False,
),
PayloadEntry(
cap="SQL_QUERY", label="sqli-union-nyx",
bytes_repr="' UNION SELECT 'NYX_SQL_CONFIRMED'--",
oracle_kind="OutputContains", oracle_value="NYX_SQL_CONFIRMED",
is_benign=False, provenance="Curated", since_corpus_version=1,
deprecated_at_corpus_version=None,
fixture_paths=["tests/benchmark/corpus/rust/sqli/sqli_rusqlite_format.rs"],
oob_nonce_slot=False,
),
# ── CODE_EXEC ──────────────────────────────────────────────────────────────
PayloadEntry(
cap="CODE_EXEC", label="cmdi-echo-marker",
bytes_repr="; echo NYX_PWN_CMDI",
oracle_kind="OutputContains", oracle_value="NYX_PWN_CMDI",
is_benign=False, provenance="Curated", since_corpus_version=1,
deprecated_at_corpus_version=None,
fixture_paths=[
"tests/benchmark/corpus/rust/cmdi/cmdi_command.rs",
"tests/benchmark/corpus/rust/cmdi/cmdi_args.rs",
],
oob_nonce_slot=False,
),
# ── FILE_IO ────────────────────────────────────────────────────────────────
PayloadEntry(
cap="FILE_IO", label="path-traversal-passwd",
bytes_repr="../../../../etc/passwd",
oracle_kind="OutputContains", oracle_value="root:",
is_benign=False, provenance="Curated", since_corpus_version=1,
deprecated_at_corpus_version=None,
fixture_paths=[
"tests/benchmark/corpus/rust/path_traversal/path_file_open.rs",
"tests/benchmark/corpus/rust/path_traversal/path_read.rs",
],
oob_nonce_slot=False,
),
PayloadEntry(
cap="FILE_IO", label="path-traversal-benign",
bytes_repr="benign_safe_file_that_does_not_exist_NYX_BENIGN",
oracle_kind="OutputContains", oracle_value="root:",
is_benign=True, provenance="Curated", since_corpus_version=1,
deprecated_at_corpus_version=None,
fixture_paths=["tests/benchmark/corpus/rust/path_traversal/path_file_open.rs"],
oob_nonce_slot=False,
),
# ── SSRF ───────────────────────────────────────────────────────────────────
PayloadEntry(
cap="SSRF", label="ssrf-file-scheme",
bytes_repr="file:///etc/passwd",
oracle_kind="OutputContains", oracle_value="daemon:",
is_benign=False, provenance="Curated", since_corpus_version=1,
deprecated_at_corpus_version=None,
fixture_paths=["tests/benchmark/corpus/rust/ssrf/ssrf_reqwest.rs"],
oob_nonce_slot=False,
),
PayloadEntry(
cap="SSRF", label="ssrf-oob-nonce",
bytes_repr="<OOB URL generated at runtime>",
oracle_kind="OobCallback", oracle_value="host=127.0.0.1",
is_benign=False, provenance="Curated", since_corpus_version=2,
deprecated_at_corpus_version=None,
fixture_paths=["tests/benchmark/corpus/rust/ssrf/ssrf_reqwest.rs"],
oob_nonce_slot=True,
),
# ── HTML_ESCAPE ────────────────────────────────────────────────────────────
PayloadEntry(
cap="HTML_ESCAPE", label="xss-script-marker",
bytes_repr="<script>NYX_XSS_CONFIRMED</script>",
oracle_kind="OutputContains",
oracle_value="<script>NYX_XSS_CONFIRMED</script>",
is_benign=False, provenance="Curated", since_corpus_version=1,
deprecated_at_corpus_version=None,
fixture_paths=["tests/benchmark/corpus/rust/xss/axum_html/main.rs"],
oob_nonce_slot=False,
),
PayloadEntry(
cap="HTML_ESCAPE", label="xss-benign-text",
bytes_repr="Hello World",
oracle_kind="OutputContains",
oracle_value="<script>NYX_XSS_CONFIRMED</script>",
is_benign=True, provenance="Curated", since_corpus_version=1,
deprecated_at_corpus_version=None,
fixture_paths=["tests/benchmark/corpus/rust/xss/axum_html/main.rs"],
oob_nonce_slot=False,
),
]
ALL_CAPS = ["SQL_QUERY", "CODE_EXEC", "FILE_IO", "SSRF", "HTML_ESCAPE"]
# ── Marker collision audit ────────────────────────────────────────────────────
def audit_marker_collisions() -> list[tuple[str, str, str]]:
collisions = []
for p in PAYLOADS:
if p.is_benign or p.oracle_kind != "OutputContains":
continue
marker = p.oracle_value or ""
for other in PAYLOADS:
if other.cap == p.cap:
continue
if other.is_benign or other.oob_nonce_slot:
continue
if marker in other.bytes_repr:
collisions.append((p.cap, p.label, other.cap))
return collisions
# ── Coverage table ────────────────────────────────────────────────────────────
def build_coverage_table() -> dict:
result = {}
for cap in ALL_CAPS:
cap_payloads = [p for p in PAYLOADS if p.cap == cap]
result[cap] = {
"total": len(cap_payloads),
"vuln": sum(1 for p in cap_payloads if not p.is_benign),
"benign": sum(1 for p in cap_payloads if p.is_benign),
"oob_slots": sum(1 for p in cap_payloads if p.oob_nonce_slot),
"has_fixture_paths": all(len(p.fixture_paths) > 0 for p in cap_payloads),
"payloads": [p.label for p in cap_payloads],
}
return result
# ── Repro artifact timestamps ─────────────────────────────────────────────────
def scan_last_confirmed(repro_dir: Path) -> dict[str, str]:
"""Return {payload_label: iso_timestamp} from repro artifact metadata."""
timestamps: dict[str, str] = {}
if not repro_dir.exists():
return timestamps
for meta_file in repro_dir.rglob("*.json"):
try:
data = json.loads(meta_file.read_text())
label = data.get("payload_label", "")
ts = data.get("confirmed_at", "")
if label and ts:
# Keep most recent.
if label not in timestamps or ts > timestamps[label]:
timestamps[label] = ts
except (json.JSONDecodeError, KeyError):
pass
return timestamps
# ── fuzz-discovered count ─────────────────────────────────────────────────────
def count_discovered(discovered_dir: Path) -> int:
if not discovered_dir.exists():
return 0
return sum(
1 for f in discovered_dir.rglob("*")
if f.is_file() and not f.name.endswith(".json") and f.name != ".gitkeep"
)
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> int:
parser = argparse.ArgumentParser(description="Nyx corpus health dashboard")
parser.add_argument("--repro-dir", default="repro", help="Path to repro artifacts")
parser.add_argument("--discovered-dir", default="fuzz-discovered",
help="Path to fuzz-discovered/ directory")
parser.add_argument("--json", action="store_true", help="Output JSON instead of text")
args = parser.parse_args()
# Change to repo root (parent of scripts/).
repo_root = Path(__file__).parent.parent
os.chdir(repo_root)
collisions = audit_marker_collisions()
coverage = build_coverage_table()
timestamps = scan_last_confirmed(Path(args.repro_dir))
discovered_count = count_discovered(Path(args.discovered_dir))
report = {
"corpus_version": CORPUS_VERSION,
"total_payloads": len(PAYLOADS),
"coverage": coverage,
"marker_collisions": collisions,
"last_confirmed": timestamps,
"fuzz_discovered_pending": discovered_count,
"healthy": len(collisions) == 0,
}
if args.json:
print(json.dumps(report, indent=2))
return 0 if report["healthy"] else 1
# Text output.
print(f"Nyx Corpus Dashboard (corpus_version={CORPUS_VERSION})")
print("=" * 60)
print()
# Coverage table.
print("Per-cap coverage:")
hdr = f" {'Cap':<18} {'Total':>5} {'Vuln':>5} {'Benign':>6} {'OOB':>4} {'Fixtures':>8}"
print(hdr)
print(" " + "-" * 52)
for cap, info in coverage.items():
fixture_ok = "ok" if info["has_fixture_paths"] else "MISSING"
print(
f" {cap:<18} {info['total']:>5} {info['vuln']:>5} "
f"{info['benign']:>6} {info['oob_slots']:>4} {fixture_ok:>8}"
)
print()
# Last confirmed timestamps.
if timestamps:
print("Last confirmed timestamps:")
for label, ts in sorted(timestamps.items()):
print(f" {label:<35} {ts}")
print()
# fuzz-discovered pending.
print(f"Fuzz-discovered pending promotion: {discovered_count}")
print()
# Marker collisions.
if collisions:
print("FAIL: Marker collisions detected (§16.3):")
for cap, label, other_cap in collisions:
print(f" {cap}/{label} marker appears in {other_cap} payload bytes")
return 1
else:
print("OK: No marker collisions detected.")
return 0
if __name__ == "__main__":
sys.exit(main())