nyx/tests/eval_corpus/report.py
2026-06-05 10:16:30 -05:00

483 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Aggregate eval results across all corpus sets and emit a summary table.
Used by run.sh after all corpus sets have been tabulated.
Phase 29 (Track I) extensions:
--budget tests/eval_corpus/budget.toml per-cell budget enforcement
--diff previous.json monotonic-improvement diff;
CI fails on any regression.
"""
import argparse
import json
import os
import sys
from collections import defaultdict
try:
import tomllib # Python 3.11+
except ModuleNotFoundError: # pragma: no cover — older interpreters only
import tomli as tomllib # type: ignore[no-redef]
# Caps with no sound runtime oracle: config / usage smells (weak crypto,
# insecure-cookie auth, reflected XSS / trust-boundary) route to
# Unsupported(SoundOracleUnavailable) by design, and the catch-all `other`
# bucket holds unclassified findings with no curated payloads. Their
# Unsupported-rate is therefore expected to be high and is reported, never
# gated — mirroring the report-only intent documented in budget.toml.
NO_SOUND_ORACLE_CAPS = {"auth", "crypto", "xss", "trustbound", "other"}
def _soft_unsupported() -> bool:
"""True when the per-cell Unsupported-rate budget is report-only.
Dynamic confirmation is environment-constrained in CI (unprivileged
sandbox, no oracle infrastructure for some caps), so the Unsupported-rate
budget — calibrated on a dev box where confirmation runs fully — would
fail vacuously there. CI sets `NYX_EVAL_SOFT_UNSUPPORTED` to demote it to
report-only; the precision (false-Confirmed) and confirmed-rate ratchets
stay hard. Unset (local dev) keeps the Unsupported budget hard.
"""
return os.environ.get("NYX_EVAL_SOFT_UNSUPPORTED", "").strip().lower() in (
"1",
"true",
"yes",
"on",
)
def load_budget(path: str) -> dict:
try:
with open(path, "rb") as f:
raw = tomllib.load(f)
except FileNotFoundError:
print(f"ERROR budget file not found: {path}", file=sys.stderr)
sys.exit(3)
except tomllib.TOMLDecodeError as e:
print(f"ERROR budget file malformed: {path}: {e}", file=sys.stderr)
sys.exit(3)
default = raw.get("default", {}) or {}
cells = {}
for row in raw.get("cell", []) or []:
cap = row.get("cap")
lang = row.get("lang")
if not cap or not lang:
print(f"ERROR budget cell missing cap/lang: {row!r}", file=sys.stderr)
sys.exit(3)
cells[(cap, lang)] = row
return {"default": default, "cells": cells}
def budget_for_cell(budget: dict, cap: str, lang: str) -> dict:
merged = dict(budget.get("default", {}) or {})
cell = budget.get("cells", {}).get((cap, lang))
if cell:
merged.update({k: v for k, v in cell.items() if k not in ("cap", "lang")})
if not cell:
wildcard = (
budget.get("cells", {}).get((cap, "*"))
or budget.get("cells", {}).get(("*", lang))
or budget.get("cells", {}).get(("*", "*"))
)
if wildcard:
merged.update(
{k: v for k, v in wildcard.items() if k not in ("cap", "lang")}
)
return merged
def load_previous_agg(path: str) -> dict:
"""Aggregate a previous results file the same way main() does."""
try:
with open(path) as f:
data = json.load(f)
except FileNotFoundError:
print(f"ERROR diff file not found: {path}", file=sys.stderr)
sys.exit(3)
except json.JSONDecodeError as e:
print(f"ERROR diff file malformed: {path}: {e}", file=sys.stderr)
sys.exit(3)
agg: dict[tuple[str, str], dict] = defaultdict(
lambda: {
"tp": 0,
"fp": 0,
"fn": 0,
"unsupported": 0,
"confirmed": 0,
"partially_confirmed": 0,
"wrong_confirmed": 0,
"stable_replays": 0,
"confirmed_tp": 0,
"confirmed_fp": 0,
"total": 0,
}
)
for r in data:
for c in r.get("cells", []):
k = (c["cap"], c["lang"])
for field in (
"tp",
"fp",
"fn",
"unsupported",
"confirmed",
"partially_confirmed",
"wrong_confirmed",
"stable_replays",
"confirmed_tp",
"confirmed_fp",
"total",
):
agg[k][field] += c.get(field, 0)
return agg
def main() -> int:
p = argparse.ArgumentParser()
p.add_argument("--results", required=True)
p.add_argument(
"--budget",
default="",
help="path to budget.toml (per-(cap,lang) thresholds)",
)
p.add_argument(
"--diff",
default="",
help="path to a previous results.json; fail on monotonic-improvement regression",
)
p.add_argument(
"--min-confirmed-rate",
type=float,
default=None,
help=(
"minimum Confirmed / total rate per cap; exits 2 when any cap "
"with findings falls below the threshold"
),
)
p.add_argument(
"--min-precision",
type=float,
default=None,
help=(
"minimum precision (tp / (tp+fp)) per cap; exits 2 when any cap "
"with at least one finding falls below the threshold. Phase 27 "
"OWASP acceptance floor (>= 0.85)."
),
)
p.add_argument(
"--min-recall",
type=float,
default=None,
help=(
"minimum recall (tp / (tp+fn)) per cap; exits 2 when any cap "
"with at least one ground-truth positive falls below the "
"threshold. Phase 27 OWASP acceptance floor (>= 0.40)."
),
)
p.add_argument(
"--floor-caps",
default="",
help=(
"comma-separated cap allowlist. When set, the --min-confirmed-rate, "
"--min-precision and --min-recall floors are ENFORCED only for these "
"caps; other caps are still measured and printed but not gated. Used "
"to exempt caps with no sound runtime oracle (e.g. crypto weak "
"randomness, secure-cookie config smells) from dynamic-confirmation "
"floors that they fundamentally cannot meet. Empty = gate every cap."
),
)
args = p.parse_args()
floor_caps = {c.strip() for c in args.floor_caps.split(",") if c.strip()}
with open(args.results) as f:
results = json.load(f)
if not results:
print("No results to report.")
return 0
# Aggregate across sets.
agg: dict[tuple[str, str], dict] = defaultdict(
lambda: {
"tp": 0,
"fp": 0,
"fn": 0,
"unsupported": 0,
"confirmed": 0,
"partially_confirmed": 0,
"wrong_confirmed": 0,
"stable_replays": 0,
"confirmed_tp": 0,
"confirmed_fp": 0,
"total": 0,
}
)
for r in results:
for c in r.get("cells", []):
k = (c["cap"], c["lang"])
for field in (
"tp",
"fp",
"fn",
"unsupported",
"confirmed",
"partially_confirmed",
"wrong_confirmed",
"stable_replays",
"confirmed_tp",
"confirmed_fp",
"total",
):
agg[k][field] += c.get(field, 0)
print("\n=== Aggregated eval corpus report ===")
print(
f"{'Cap':<20} {'Lang':<12} {'TP':>5} {'FP':>5} {'FN':>5} "
f"{'Prec':>6} {'Rec':>6} {'Unsup%':>7} {'Conf%':>7} {'Part%':>7}"
)
print("-" * 88)
for k, v in sorted(agg.items()):
prec = v["tp"] / max(v["tp"] + v["fp"], 1)
rec = v["tp"] / max(v["tp"] + v["fn"], 1)
unsup = v["unsupported"] / max(v["total"], 1)
conf = v["confirmed"] / max(v["total"], 1)
part = v["partially_confirmed"] / max(v["total"], 1)
print(
f"{k[0]:<20} {k[1]:<12} "
f"{v['tp']:>5} {v['fp']:>5} {v['fn']:>5} "
f"{prec:>6.2f} {rec:>6.2f} "
f"{unsup*100:>6.1f}% {conf*100:>6.1f}% {part*100:>6.1f}%"
)
gate_failed = False
# ── Phase 29: per-cell budget enforcement ────────────────────────────
if args.budget:
budget = load_budget(args.budget)
print(f"\n=== Per-cell budget ({args.budget}) ===")
soft_unsupported = _soft_unsupported()
cell_fails: list[str] = []
soft_fails: list[str] = []
for k, v in sorted(agg.items()):
b = budget_for_cell(budget, k[0], k[1])
if not b:
continue
max_unsup = b.get("unsupported_rate")
max_false = b.get("false_confirmed_rate")
min_stable = b.get("repro_stability")
min_confirmed = b.get("confirmed_rate")
if isinstance(max_unsup, (int, float)) and v["total"] > 0:
rate = v["unsupported"] / v["total"]
if rate > max_unsup:
msg = (
f"{k[0]}/{k[1]}: Unsupported {rate*100:.1f}%"
f" > budget {max_unsup*100:.1f}%"
)
if k[0] in NO_SOUND_ORACLE_CAPS or soft_unsupported:
soft_fails.append(f" soft {msg}")
else:
cell_fails.append(f" FAIL {msg}")
if isinstance(max_false, (int, float)) and v["confirmed"] > 0:
rate = v["wrong_confirmed"] / v["confirmed"]
if rate > max_false:
cell_fails.append(
f" FAIL {k[0]}/{k[1]}: false-Confirmed {rate*100:.1f}%"
f" > budget {max_false*100:.1f}%"
)
if (
isinstance(min_stable, (int, float))
and v["confirmed"] > 0
and v.get("stable_replays", 0) > 0
):
rate = v["stable_replays"] / v["confirmed"]
if rate < min_stable:
cell_fails.append(
f" FAIL {k[0]}/{k[1]}: repro stability {rate*100:.1f}%"
f" < budget {min_stable*100:.1f}%"
)
if isinstance(min_confirmed, (int, float)) and v["total"] > 0:
rate = v["confirmed"] / v["total"]
if rate < min_confirmed:
cell_fails.append(
f" FAIL {k[0]}/{k[1]}: Confirmed {rate*100:.1f}%"
f" < budget {min_confirmed*100:.1f}%"
)
if soft_fails:
print(
" Unsupported-rate over budget (report-only: no-sound-oracle "
"cap or environment-constrained dynamic confirmation):"
)
for line in soft_fails:
print(line)
if cell_fails:
for line in cell_fails:
print(line)
gate_failed = True
else:
print(" All hard per-cell budgets met.")
else:
# Legacy fallback: per-cap Unsupported rate <= 80%.
print("\n=== Gate checks ===")
UNSUPPORTED_BUDGET = 0.80
cell_fails: list[str] = []
for k, v in sorted(agg.items()):
unsup = v["unsupported"] / max(v["total"], 1)
if unsup > UNSUPPORTED_BUDGET:
cell_fails.append(
f" FAIL {k[0]}/{k[1]}: Unsupported {unsup*100:.1f}%"
f" > {UNSUPPORTED_BUDGET*100:.0f}% budget"
)
if cell_fails:
for line in cell_fails:
print(line)
gate_failed = True
else:
print(" All gate thresholds met.")
# ── Per-cap Confirmed-rate (published always; gated when a floor given) ──
# Aggregated per cap across languages. The table is always printed so the
# corpus's confirmation profile is visible ("publish per-cap …"); the floor
# only FAILS the run when --min-confirmed-rate is supplied and the cap is in
# scope (floor_caps empty = every cap in scope).
cap_totals: dict[str, dict] = defaultdict(lambda: {"confirmed": 0, "total": 0})
for (cap, _lang), v in agg.items():
cap_totals[cap]["confirmed"] += v.get("confirmed", 0)
cap_totals[cap]["total"] += v.get("total", 0)
if cap_totals:
floor_txt = (
f" (floor {args.min_confirmed_rate*100:.1f}%)"
if args.min_confirmed_rate is not None
else " (report-only)"
)
print(f"\n=== Per-cap Confirmed-rate{floor_txt} ===")
confirmed_fails: list[str] = []
for cap, v in sorted(cap_totals.items()):
if v["total"] <= 0:
continue
rate = v["confirmed"] / v["total"]
gated = args.min_confirmed_rate is not None and (
(not floor_caps) or (cap in floor_caps)
)
line = (
f" {cap:<20} {v['confirmed']:>5}/{v['total']:<5} "
f"{rate*100:>6.1f}%"
)
if gated and rate < args.min_confirmed_rate:
confirmed_fails.append(f"{line} FAIL")
elif args.min_confirmed_rate is None:
print(line)
else:
print(f"{line} {'OK' if gated else 'skip (no floor)'}")
if confirmed_fails:
for line in confirmed_fails:
print(line)
gate_failed = True
elif args.min_confirmed_rate is not None:
print(" All confirmed-rate floors met.")
# ── Per-cap precision / recall (published always; gated when a floor given) ──
# OWASP acceptance: per-cap precision ≥ 0.85, recall ≥ 0.40. Aggregated per
# cap across languages (tp/fp/fn summed over every lang cell). The table is
# always printed ("publish per-cap precision/recall"); a cap FAILS only when
# the matching --min-* floor is supplied and the cap is in scope (floor_caps
# empty = every cap in scope).
cap_pr: dict[str, dict] = defaultdict(lambda: {"tp": 0, "fp": 0, "fn": 0})
for (cap, _lang), v in agg.items():
cap_pr[cap]["tp"] += v.get("tp", 0)
cap_pr[cap]["fp"] += v.get("fp", 0)
cap_pr[cap]["fn"] += v.get("fn", 0)
if cap_pr:
floors = []
if args.min_precision is not None:
floors.append(f"precision ≥ {args.min_precision*100:.1f}%")
if args.min_recall is not None:
floors.append(f"recall ≥ {args.min_recall*100:.1f}%")
floor_txt = f" (floors: {', '.join(floors)})" if floors else " (report-only)"
print(f"\n=== Per-cap precision/recall{floor_txt} ===")
print(f" {'Cap':<20} {'TP':>5} {'FP':>5} {'FN':>5} {'Prec':>7} {'Rec':>7} Status")
pr_failed = False
any_gated = False
for cap, v in sorted(cap_pr.items()):
tp, fp, fn = v["tp"], v["fp"], v["fn"]
# No findings and no GT positives → cap not present in this corpus.
if tp + fp + fn == 0:
continue
prec = tp / max(tp + fp, 1)
rec = tp / max(tp + fn, 1)
gated = (not floor_caps) or (cap in floor_caps)
tags = []
if gated and args.min_precision is not None and (tp + fp) > 0 and prec < args.min_precision:
tags.append("PRECISION")
if gated and args.min_recall is not None and (tp + fn) > 0 and rec < args.min_recall:
tags.append("RECALL")
if tags:
status = "FAIL " + "+".join(tags)
elif not floors:
status = ""
elif gated:
status = "OK"
any_gated = True
else:
status = "skip (no floor)"
print(
f" {cap:<20} {tp:>5} {fp:>5} {fn:>5} "
f"{prec:>7.2f} {rec:>7.2f} {status}"
)
if tags:
pr_failed = True
if pr_failed:
gate_failed = True
elif floors and any_gated:
print(" All per-cap precision/recall floors met.")
# ── Phase 29: monotonic-improvement diff ─────────────────────────────
if args.diff:
prev = load_previous_agg(args.diff)
print(f"\n=== Monotonic-improvement diff vs {args.diff} ===")
diff_fails: list[str] = []
EPS = 0.005
for k, v in sorted(agg.items()):
old = prev.get(k)
if not old:
continue
old_unsup = old["unsupported"] / max(old["total"], 1)
new_unsup = v["unsupported"] / max(v["total"], 1)
if new_unsup > old_unsup + EPS:
diff_fails.append(
f" REGRESSION {k[0]}/{k[1]}: Unsupported"
f" {old_unsup*100:.1f}% → {new_unsup*100:.1f}%"
)
old_conf = old.get("confirmed", 0)
new_conf = v.get("confirmed", 0)
old_false = (old.get("wrong_confirmed", 0) / old_conf) if old_conf else None
new_false = (v.get("wrong_confirmed", 0) / new_conf) if new_conf else None
if old_false is not None and new_false is not None and new_false > old_false + EPS:
diff_fails.append(
f" REGRESSION {k[0]}/{k[1]}: false-Confirmed"
f" {old_false*100:.1f}% → {new_false*100:.1f}%"
)
old_stable = (old.get("stable_replays", 0) / old_conf) if old_conf else None
new_stable = (v.get("stable_replays", 0) / new_conf) if new_conf else None
if (
old_stable is not None
and new_stable is not None
and new_stable < old_stable - EPS
):
diff_fails.append(
f" REGRESSION {k[0]}/{k[1]}: repro stability"
f" {old_stable*100:.1f}% → {new_stable*100:.1f}%"
)
if diff_fails:
for line in diff_fails:
print(line)
gate_failed = True
else:
print(" No regressions vs previous run.")
return 2 if gate_failed else 0
if __name__ == "__main__":
sys.exit(main())