#!/usr/bin/env python3 """ Tabulate nyx scan results against a ground-truth file. For OWASP / SARD sets: compares nyx findings against known-true/known-false labels from the ground truth JSON. For in-house sets (--inhouse): counts findings by cap x language; reports Unsupported rate only (no ground truth required). Output: appends a result record to --append FILE. Phase 29 (Track I) extensions: --budget tests/eval_corpus/budget.toml enforce per-cell budget thresholds --diff previous.json compare against prior result file, fail on monotonic-improvement regression Exit codes: 0 all rows pass. 2 one or more per-cell budgets exceeded OR a diff regression was found. 3 malformed budget / diff input (callers must fix configuration). """ import argparse import json import os import sys from collections import defaultdict from pathlib import Path try: import tomllib # Python 3.11+ except ModuleNotFoundError: # pragma: no cover — older interpreters only import tomli as tomllib # type: ignore[no-redef] LINE_TOLERANCE = 5 # Caps with no sound runtime oracle (config / usage smells) and the catch-all # `other` bucket route to Unsupported by design, so their Unsupported-rate is # report-only, never gated. Mirrors report.py / the budget.toml intent. NO_SOUND_ORACLE_CAPS = {"auth", "crypto", "xss", "trustbound", "other"} def _soft_unsupported() -> bool: """True when the per-cell Unsupported-rate budget is report-only. CI sets `NYX_EVAL_SOFT_UNSUPPORTED` because dynamic confirmation is environment-constrained there (the budget is calibrated on a dev box where confirmation runs fully); the precision / confirmed-rate ratchets stay hard. Unset (local dev) keeps the Unsupported budget hard. """ return os.environ.get("NYX_EVAL_SOFT_UNSUPPORTED", "").strip().lower() in ( "1", "true", "yes", "on", ) # Bitflag positions for Cap (src/labels/mod.rs). Sink bits map to a cap label. _CAP_BIT_TABLE = [ (1 << 5, "path_traversal"), # FILE_IO (1 << 6, "fmt_string"), (1 << 7, "sqli"), # SQL_QUERY (1 << 8, "deserialize"), (1 << 9, "ssrf"), (1 << 10, "cmdi"), # CODE_EXEC (1 << 11, "crypto"), (1 << 12, "unauthorized_id"), (1 << 13, "data_exfil"), (1 << 14, "ldap_injection"), (1 << 15, "xpath_injection"), (1 << 16, "header_injection"), (1 << 17, "redirect"), # OPEN_REDIRECT (1 << 18, "xss"), # SSTI (template_injection); also covers XSS sinks (1 << 19, "xxe"), (1 << 20, "prototype_pollution"), # HTML_ESCAPE (1<<1) is the universal reflected-XSS *sink* cap across every # language (`grep 'Sink(Cap::HTML_ESCAPE)' src/labels/` — PHP echo, JS # innerHTML, Java servlet writers, etc.); the same bit is the html-escape # *sanitizer* cap, so a finding only carries it as a sink when an un-encoded # tainted value reached an HTML output. Placed LAST so any higher-priority # sink bit (SQL_QUERY, FILE_IO, ...) on the same finding wins; a finding # carrying only HTML_ESCAPE is reflected XSS. Without this, every # taint-based reflected-XSS finding mis-buckets to "other". (1 << 1, "xss"), ] # Static lens (see --static): SHELL_ESCAPE (1<<2) is the command-injection sink # cap for *every* language (`grep SHELL_ESCAPE src/labels/` — all Sink uses are # command-exec; CODE_EXEC=1<<10 is the eval/code-exec variant, also cmdi). In a # normal `nyx scan` (no dynamic confirmation) a Java cmdi finding carries only # SHELL_ESCAPE; the SHELL_ESCAPE→CODE_EXEC remap that buckets it as cmdi is gated # on VerifyStatus::Confirmed (src/commands/scan.rs), so with 0 confirmations the # default table leaves these in "other" and the cmdi cell reads 0/0/N. The # static lens appends SHELL_ESCAPE→cmdi at the LOWEST priority (after every other # bit) so a SHELL_ESCAPE-only finding buckets as cmdi while a finding that also # carries a higher-priority sink bit (e.g. FILE_IO) keeps its existing bucket. # Opt-in via --static so the default confirmed-recall bucketing is byte-identical. _CAP_BIT_TABLE_STATIC = _CAP_BIT_TABLE + [(1 << 2, "cmdi")] # SHELL_ESCAPE # Substring → cap lookup for rule IDs. Order matters: most specific first. _CAP_RULE_TABLE = [ ("path_traversal", "path_traversal"), ("sql", "sqli"), ("xss", "xss"), ("ssrf", "ssrf"), ("cmdi", "cmdi"), ("cmd_exec", "cmdi"), ("code_exec", "cmdi"), ("deser", "deserialize"), ("unserialize", "deserialize"), ("redirect", "redirect"), ("xxe", "xxe"), ("template", "xss"), ("auth", "auth"), ("memory", "memory"), ("crypto", "crypto"), ("data-exfil", "data_exfil"), ("data_exfil", "data_exfil"), ("header", "header_injection"), ] def load_json(path: str) -> object: with open(path) as f: return json.load(f) def cap_of(finding: dict, static_lens: bool = False) -> str: # 1. Prefer evidence.sink_caps bitmask — the engine's own classification. ev = finding.get("evidence", {}) or {} sink_caps = ev.get("sink_caps") if isinstance(sink_caps, int) and sink_caps: table = _CAP_BIT_TABLE_STATIC if static_lens else _CAP_BIT_TABLE for bit, name in table: if sink_caps & bit: return name # 2. Fall back to rule id substring (e.g. py.cmdi.os_system, java.deser.readobject). rid = (finding.get("id") or "").lower() head = rid.split(" ", 1)[0] for needle, cap in _CAP_RULE_TABLE: if needle in head: return cap return "other" def lang_of(finding: dict) -> str: path = finding.get("path", "") ext_map = { ".py": "python", ".js": "javascript", ".ts": "typescript", ".java": "java", ".go": "go", ".php": "php", ".rb": "ruby", ".rs": "rust", ".c": "c", ".cpp": "cpp", } for ext, lang in ext_map.items(): if path.endswith(ext): return lang return "unknown" def _norm_path(p: str) -> str: return p.replace("\\", "/") def path_matches(gt_path: str, finding_path: str) -> bool: """True when a ground-truth path refers to the same file as a finding path. Ground-truth paths are stored *relative to the corpus root* so the checked-in JSON stays portable, while nyx emits absolute paths rooted at wherever the corpus was cloned. Match on a path-component-aligned suffix so the relative GT path matches the absolute finding path (and the reverse, to keep a legacy absolute GT file working). Exact equality is the fast path; the `/` boundary stops `.../BenchmarkTest1.java` from matching `.../xBenchmarkTest1.java`. """ g = _norm_path(gt_path) f = _norm_path(finding_path) return g == f or f.endswith("/" + g) or g.endswith("/" + f) # ── Budget loading ────────────────────────────────────────────────────────── def load_budget(path: str) -> dict: """Parse a budget.toml file. Returns a dict:: { "default": {"unsupported_rate": 0.8, "false_confirmed_rate": 0.02, "repro_stability": 0.95, "ratchet_deadline": "..."}, "cells": {(cap, lang): {...overrides...}, ...}, } Raises SystemExit(3) on a malformed file. """ try: with open(path, "rb") as f: raw = tomllib.load(f) except FileNotFoundError: print(f"ERROR budget file not found: {path}", file=sys.stderr) sys.exit(3) except tomllib.TOMLDecodeError as e: print(f"ERROR budget file malformed: {path}: {e}", file=sys.stderr) sys.exit(3) default = raw.get("default", {}) or {} cells = {} for row in raw.get("cell", []) or []: cap = row.get("cap") lang = row.get("lang") if not cap or not lang: print( f"ERROR budget cell missing cap/lang: {row!r}", file=sys.stderr ) sys.exit(3) cells[(cap, lang)] = row return {"default": default, "cells": cells} def budget_for_cell(budget: dict, cap: str, lang: str) -> dict: """Merge cell-specific overrides on top of [default].""" merged = dict(budget.get("default", {}) or {}) cell = budget.get("cells", {}).get((cap, lang)) if cell: merged.update({k: v for k, v in cell.items() if k not in ("cap", "lang")}) # Fall back to a wildcard override if present. if not cell: wildcard = budget.get("cells", {}).get((cap, "*")) or \ budget.get("cells", {}).get(("*", lang)) or \ budget.get("cells", {}).get(("*", "*")) if wildcard: merged.update({k: v for k, v in wildcard.items() if k not in ("cap", "lang")}) return merged def enforce_budget(cells: list, budget: dict) -> list: """Return a list of human-readable failure strings. Each cell's measured Unsupported / false-Confirmed / repro-stability rate is compared against its merged budget row. A missing measurement (e.g. no Confirmed findings → false-Confirmed denominator = 0) is treated as "no data" and skipped, never as a failure. """ failures = [] soft_unsupported = _soft_unsupported() for c in cells: b = budget_for_cell(budget, c["cap"], c["lang"]) if not b: continue cap, lang = c["cap"], c["lang"] max_unsup = b.get("unsupported_rate") max_false = b.get("false_confirmed_rate") min_stable = b.get("repro_stability") min_confirmed = b.get("confirmed_rate") if isinstance(max_unsup, (int, float)) and c.get("total", 0) > 0: if c["unsupported_rate"] > max_unsup: # No-sound-oracle caps (and `other`) are report-only by design; # the rest are report-only when dynamic confirmation is known to # be environment-constrained (NYX_EVAL_SOFT_UNSUPPORTED, set by # CI). Hard otherwise so local dev still ratchets coverage. line = ( f" {cap}/{lang}: Unsupported {c['unsupported_rate']*100:.1f}%" f" > budget {max_unsup*100:.1f}%" ) if not (cap in NO_SOUND_ORACLE_CAPS or soft_unsupported): failures.append(f" FAIL{line}") if isinstance(min_confirmed, (int, float)) and c.get("total", 0) > 0: rate = c.get("confirmed", 0) / c["total"] if rate < min_confirmed: failures.append( f" FAIL {cap}/{lang}: Confirmed {rate*100:.1f}%" f" < budget {min_confirmed*100:.1f}%" ) if isinstance(max_false, (int, float)) and c.get("confirmed", 0) > 0: rate = c.get("wrong_confirmed", 0) / c["confirmed"] if rate > max_false: failures.append( f" FAIL {cap}/{lang}: false-Confirmed {rate*100:.1f}%" f" > budget {max_false*100:.1f}%" ) # Repro stability is only enforced when callers stamped at least # one `replay_stable: true` flag — otherwise stable_replays == 0 # is indistinguishable from "we did not measure stability for # this row" and the gate would fire vacuously on every clean run. if ( isinstance(min_stable, (int, float)) and c.get("confirmed", 0) > 0 and c.get("stable_replays", 0) > 0 ): rate = c["stable_replays"] / c["confirmed"] if rate < min_stable: failures.append( f" FAIL {cap}/{lang}: repro stability {rate*100:.1f}%" f" < budget {min_stable*100:.1f}%" ) return failures # ── Diff loading ──────────────────────────────────────────────────────────── def load_previous_cells(path: str, label: str) -> dict: """Index a previous results file by (cap, lang) → cell. The previous file is the same shape as `--append`'s output. We pick the record whose `label` matches the current run; if no exact match, fall back to the first record. Missing/unreadable files exit 3. """ try: with open(path) as f: data = json.load(f) except FileNotFoundError: print(f"ERROR diff file not found: {path}", file=sys.stderr) sys.exit(3) except json.JSONDecodeError as e: print(f"ERROR diff file malformed: {path}: {e}", file=sys.stderr) sys.exit(3) records = data if isinstance(data, list) else [data] chosen = None for r in records: if r.get("label") == label: chosen = r break if chosen is None and records: chosen = records[0] if not chosen: return {} return {(c["cap"], c["lang"]): c for c in chosen.get("cells", [])} def diff_regressions(cells: list, prev: dict) -> list: """Compare current cells against previous. Returns failure strings. Three monotonicity rules: * Unsupported% must not increase. * False-Confirmed% must not increase. * Repro-stability% must not decrease. Cells absent from `prev` are treated as new (skipped). A small epsilon (0.5 percentage points) absorbs flake noise. """ EPS = 0.005 failures = [] for c in cells: key = (c["cap"], c["lang"]) old = prev.get(key) if not old: continue # Unsupported. old_unsup = old.get("unsupported_rate", 0.0) new_unsup = c.get("unsupported_rate", 0.0) if new_unsup > old_unsup + EPS: failures.append( f" REGRESSION {key[0]}/{key[1]}: Unsupported" f" {old_unsup*100:.1f}% → {new_unsup*100:.1f}%" ) # False-Confirmed. old_conf = old.get("confirmed", 0) old_false = (old.get("wrong_confirmed", 0) / old_conf) if old_conf else None new_conf = c.get("confirmed", 0) new_false = (c.get("wrong_confirmed", 0) / new_conf) if new_conf else None if old_false is not None and new_false is not None and new_false > old_false + EPS: failures.append( f" REGRESSION {key[0]}/{key[1]}: false-Confirmed" f" {old_false*100:.1f}% → {new_false*100:.1f}%" ) # Repro stability (higher is better). old_stable = ( (old.get("stable_replays", 0) / old_conf) if old_conf else None ) new_stable = ( (c.get("stable_replays", 0) / new_conf) if new_conf else None ) if ( old_stable is not None and new_stable is not None and new_stable < old_stable - EPS ): failures.append( f" REGRESSION {key[0]}/{key[1]}: repro stability" f" {old_stable*100:.1f}% → {new_stable*100:.1f}%" ) return failures def main() -> int: p = argparse.ArgumentParser() p.add_argument("--label", required=True) p.add_argument("--scan", required=True, help="nyx scan --format json output") p.add_argument("--ground-truth", default="", help="ground truth JSON") p.add_argument("--inhouse", action="store_true") p.add_argument("--append", required=True, help="results accumulator JSON") p.add_argument( "--manual-triage", default="", help=( "path to a manual-triage JSON file (list of " "{path, line, cap, vuln: bool}). Confirmed findings matching a " "`vuln: false` entry are stamped with `wrong: true` before " "tabulation so the per-cell False-Confirmed budget becomes " "non-vacuous without depending on the host's `nyx verify-feedback` " "log. Matching uses LINE_TOLERANCE (=5) — line == 0 in the triage " "entry matches any line." ), ) p.add_argument( "--budget", default="", help="path to budget.toml (per-(cap,lang) thresholds)", ) p.add_argument( "--lang", default="", help=( "comma-separated language allowlist (python, javascript, php, " "ruby, go, rust, ...). When set, only findings AND ground-truth " "entries whose source language is in the list are tabulated; " "everything else is dropped before tallying. Used by the Phase 29 " "polyglot corpora (Track R.2) to scope a single-language corpus to " "its target language so incidental third-party assets in other " "languages — e.g. the vendored JavaScript a Rails or aiohttp app " "bundles — do not pollute that corpus's per-cap metrics. Empty = " "no language filter (every finding tabulated, the OWASP/JSTS " "default)." ), ) p.add_argument( "--diff", default="", help="path to a previous results JSON; fail on monotonic-improvement regression", ) p.add_argument( "--static", action="store_true", help=( "static lens: bucket SHELL_ESCAPE (1<<2) findings as cmdi even when " "they are unconfirmed. Java (and other) command-exec sinks carry " "SHELL_ESCAPE and only get remapped to CODE_EXEC on dynamic Confirm; " "without this flag, an env with 0 confirmations reads the cmdi cell " "as 0/0/N regardless of static quality. SHELL_ESCAPE is the " "command-injection sink cap for every language, so this is sound " "globally; it is opt-in only so the default confirmed-recall " "bucketing stays byte-identical." ), ) args = p.parse_args() lang_filter = {l.strip() for l in args.lang.split(",") if l.strip()} scan_data = load_json(args.scan) findings = scan_data if isinstance(scan_data, list) else scan_data.get("findings", []) # Score only Security-category findings against the security ground truth. # Reliability defects (resource leaks, error-handling fallthrough) and # Quality findings are real bugs but not the injection / crypto / auth # vulns the corpus ground truth enumerates, so counting them as security # false-positives is a category error that wrecks precision with pure # noise. Findings with no explicit category (legacy fixtures) default to # Security and are kept. findings = [ f for f in findings if f.get("category", "Security") not in ("Reliability", "Quality") ] if lang_filter: findings = [f for f in findings if lang_of(f) in lang_filter] # ── Manual-triage stamping (Phase 31 follow-up) ─────────────────────── # Cross-reference Confirmed rows against a manual-triage file before # tabulation. Each `vuln: false` entry whose `(path, cap)` matches a # Confirmed finding (with LINE_TOLERANCE, or any line when triage # entry's `line == 0`) stamps `wrong: true` on the finding's # `dynamic_verdict`, which the existing wrong_confirmed counter picks # up below. Decouples the False-Confirmed budget from the host-local # `nyx verify-feedback` log so CI on a fresh eval corpus can still # gate the headline target. if args.manual_triage and Path(args.manual_triage).exists(): triage = load_json(args.manual_triage) not_vuln: list[dict] = [] for entry in triage if isinstance(triage, list) else []: if entry.get("vuln") is False: not_vuln.append({ "path": entry.get("path", ""), "line": entry.get("line", 0), "cap": entry.get("cap", ""), }) used: set[int] = set() for f in findings: ev = f.get("evidence") or {} dv = ev.get("dynamic_verdict") or {} if dv.get("status") != "Confirmed": continue f_path = f.get("path", "") f_line = f.get("line", 0) f_cap = cap_of(f, static_lens=args.static) for idx, entry in enumerate(not_vuln): if idx in used: continue if (path_matches(entry["path"], f_path) and entry["cap"] == f_cap and (entry["line"] == 0 or abs(entry["line"] - f_line) <= LINE_TOLERANCE)): used.add(idx) dv["wrong"] = True ev["dynamic_verdict"] = dv f["evidence"] = ev break # Per-cell tallies: {(cap, lang): {tp, fp, fn, unsupported, confirmed, # partially_confirmed, wrong_confirmed, stable_replays, total}} cells: dict[tuple[str, str], dict] = defaultdict( lambda: { "tp": 0, "fp": 0, "fn": 0, "unsupported": 0, "confirmed": 0, "partially_confirmed": 0, "wrong_confirmed": 0, "stable_replays": 0, # Confirmed-verdict precision/recall accounting, ground-truth-derived # (only populated when --ground-truth is supplied): confirmed_tp = # Confirmed findings that match a GT positive; confirmed_fp = # Confirmed findings that match no GT positive (false confirms). "confirmed_tp": 0, "confirmed_fp": 0, "total": 0, } ) for f in findings: cap = cap_of(f, static_lens=args.static) lang = lang_of(f) key = (cap, lang) ev = f.get("evidence", {}) or {} dv = ev.get("dynamic_verdict") if ev else None cells[key]["total"] += 1 if dv: status = dv.get("status") if status == "Unsupported": cells[key]["unsupported"] += 1 elif status == "PartiallyConfirmed": cells[key]["partially_confirmed"] += 1 elif status == "Confirmed": cells[key]["confirmed"] += 1 # Repro-stability and false-Confirmed counts are optional # fields tabulate.py reads off the verdict when callers have # stamped them. if dv.get("wrong") is True: cells[key]["wrong_confirmed"] += 1 if dv.get("replay_stable") is True: cells[key]["stable_replays"] += 1 if not args.inhouse and args.ground_truth and Path(args.ground_truth).exists(): gt = load_json(args.ground_truth) # Ground truth format: list of {"path": ..., "line": ..., "cap": ..., "vuln": bool} gt_true: list[dict] = [] for entry in gt if isinstance(gt, list) else []: # Honour the same language scope as the findings filter so recall # is measured only over the corpus's target language. if lang_filter and lang_of(entry) not in lang_filter: continue if entry.get("vuln"): gt_true.append({ "path": entry.get("path", ""), "line": entry.get("line", 0), "cap": entry.get("cap", ""), }) # Track which GT entries were matched (by index) to avoid double-counting. matched_gt: set[int] = set() # Track (path, cap) pairs that had at least one finding match. found_path_caps: set[tuple[str, str]] = set() for f in findings: f_path = f.get("path", "") f_line = f.get("line", 0) f_cap = cap_of(f, static_lens=args.static) cap = f_cap lang = lang_of(f) cell_key = (cap, lang) dv = (f.get("evidence") or {}).get("dynamic_verdict") or {} is_confirmed = dv.get("status") == "Confirmed" matched_idx = None for idx, gt_entry in enumerate(gt_true): if (path_matches(gt_entry["path"], f_path) and gt_entry["cap"] == f_cap and idx not in matched_gt and (gt_entry["line"] == 0 or abs(gt_entry["line"] - f_line) <= LINE_TOLERANCE)): matched_idx = idx break if matched_idx is not None: matched_gt.add(matched_idx) found_path_caps.add((f_path, f_cap)) cells[cell_key]["tp"] += 1 if is_confirmed: cells[cell_key]["confirmed_tp"] += 1 else: cells[cell_key]["fp"] += 1 if is_confirmed: cells[cell_key]["confirmed_fp"] += 1 for idx, gt_entry in enumerate(gt_true): if idx not in matched_gt: cap = gt_entry["cap"] # Land the FN in the cell its source language implies (from the # GT path extension) so per-(cap,lang) recall is meaningful and # OWASP misses show up in the java cell, not a stray "unknown". cells[(cap, lang_of(gt_entry))]["fn"] += 1 # Ground-truth-derived false-confirm accounting. When a corpus ships a # vuln/benign label per file (OWASP, SARD), a Confirmed finding that # matches no GT positive is a false confirm — authoritative, so it # overrides any manual-triage stamping for these labelled sets. This is # what makes the per-cell `false_confirmed_rate` budget non-vacuous on a # fresh eval corpus without a host-local verify-feedback log. for v in cells.values(): if v["confirmed_tp"] or v["confirmed_fp"]: v["wrong_confirmed"] = v["confirmed_fp"] result = { "label": args.label, "total_findings": len(findings), "cells": [ { "cap": k[0], "lang": k[1], **v, "precision": v["tp"] / max(v["tp"] + v["fp"], 1), "recall": v["tp"] / max(v["tp"] + v["fn"], 1), "unsupported_rate": v["unsupported"] / max(v["total"], 1), } for k, v in sorted(cells.items()) ], } existing = load_json(args.append) if Path(args.append).exists() else [] existing.append(result) with open(args.append, "w") as f: json.dump(existing, f, indent=2) # Print summary print(f"\n=== {args.label} ===") print(f"{'Cap':<20} {'Lang':<12} {'TP':>5} {'FP':>5} {'FN':>5} {'Prec':>6} {'Rec':>6} {'Unsup%':>7}") print("-" * 72) for c in result["cells"]: print( f"{c['cap']:<20} {c['lang']:<12} " f"{c['tp']:>5} {c['fp']:>5} {c['fn']:>5} " f"{c['precision']:>6.2f} {c['recall']:>6.2f} " f"{c['unsupported_rate']*100:>6.1f}%" ) exit_rc = 0 # ── Phase 29: per-cell budget enforcement ───────────────────────────── if args.budget: budget = load_budget(args.budget) failures = enforce_budget(result["cells"], budget) if failures: print(f"\n=== Per-cell budget regressions ({args.budget}) ===") for line in failures: print(line) exit_rc = 2 else: print(f"\nPer-cell budget ({args.budget}): OK") # ── Phase 29: diff against previous run ─────────────────────────────── if args.diff: prev = load_previous_cells(args.diff, args.label) failures = diff_regressions(result["cells"], prev) if failures: print(f"\n=== Monotonic-improvement regressions vs {args.diff} ===") for line in failures: print(line) exit_rc = 2 else: print(f"\nDiff vs {args.diff}: no regressions") return exit_rc if __name__ == "__main__": sys.exit(main())