mirror of
https://github.com/elicpeter/nyx.git
synced 2026-06-06 19:35:13 +02:00
483 lines
18 KiB
Python
483 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Aggregate eval results across all corpus sets and emit a summary table.
|
|
Used by run.sh after all corpus sets have been tabulated.
|
|
|
|
Phase 29 (Track I) extensions:
|
|
--budget tests/eval_corpus/budget.toml per-cell budget enforcement
|
|
--diff previous.json monotonic-improvement diff;
|
|
CI fails on any regression.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
from collections import defaultdict
|
|
|
|
try:
|
|
import tomllib # Python 3.11+
|
|
except ModuleNotFoundError: # pragma: no cover — older interpreters only
|
|
import tomli as tomllib # type: ignore[no-redef]
|
|
|
|
# Caps with no sound runtime oracle: config / usage smells (weak crypto,
|
|
# insecure-cookie auth, reflected XSS / trust-boundary) route to
|
|
# Unsupported(SoundOracleUnavailable) by design, and the catch-all `other`
|
|
# bucket holds unclassified findings with no curated payloads. Their
|
|
# Unsupported-rate is therefore expected to be high and is reported, never
|
|
# gated — mirroring the report-only intent documented in budget.toml.
|
|
NO_SOUND_ORACLE_CAPS = {"auth", "crypto", "xss", "trustbound", "other"}
|
|
|
|
|
|
def _soft_unsupported() -> bool:
|
|
"""True when the per-cell Unsupported-rate budget is report-only.
|
|
|
|
Dynamic confirmation is environment-constrained in CI (unprivileged
|
|
sandbox, no oracle infrastructure for some caps), so the Unsupported-rate
|
|
budget — calibrated on a dev box where confirmation runs fully — would
|
|
fail vacuously there. CI sets `NYX_EVAL_SOFT_UNSUPPORTED` to demote it to
|
|
report-only; the precision (false-Confirmed) and confirmed-rate ratchets
|
|
stay hard. Unset (local dev) keeps the Unsupported budget hard.
|
|
"""
|
|
return os.environ.get("NYX_EVAL_SOFT_UNSUPPORTED", "").strip().lower() in (
|
|
"1",
|
|
"true",
|
|
"yes",
|
|
"on",
|
|
)
|
|
|
|
|
|
def load_budget(path: str) -> dict:
|
|
try:
|
|
with open(path, "rb") as f:
|
|
raw = tomllib.load(f)
|
|
except FileNotFoundError:
|
|
print(f"ERROR budget file not found: {path}", file=sys.stderr)
|
|
sys.exit(3)
|
|
except tomllib.TOMLDecodeError as e:
|
|
print(f"ERROR budget file malformed: {path}: {e}", file=sys.stderr)
|
|
sys.exit(3)
|
|
default = raw.get("default", {}) or {}
|
|
cells = {}
|
|
for row in raw.get("cell", []) or []:
|
|
cap = row.get("cap")
|
|
lang = row.get("lang")
|
|
if not cap or not lang:
|
|
print(f"ERROR budget cell missing cap/lang: {row!r}", file=sys.stderr)
|
|
sys.exit(3)
|
|
cells[(cap, lang)] = row
|
|
return {"default": default, "cells": cells}
|
|
|
|
|
|
def budget_for_cell(budget: dict, cap: str, lang: str) -> dict:
|
|
merged = dict(budget.get("default", {}) or {})
|
|
cell = budget.get("cells", {}).get((cap, lang))
|
|
if cell:
|
|
merged.update({k: v for k, v in cell.items() if k not in ("cap", "lang")})
|
|
if not cell:
|
|
wildcard = (
|
|
budget.get("cells", {}).get((cap, "*"))
|
|
or budget.get("cells", {}).get(("*", lang))
|
|
or budget.get("cells", {}).get(("*", "*"))
|
|
)
|
|
if wildcard:
|
|
merged.update(
|
|
{k: v for k, v in wildcard.items() if k not in ("cap", "lang")}
|
|
)
|
|
return merged
|
|
|
|
|
|
def load_previous_agg(path: str) -> dict:
|
|
"""Aggregate a previous results file the same way main() does."""
|
|
try:
|
|
with open(path) as f:
|
|
data = json.load(f)
|
|
except FileNotFoundError:
|
|
print(f"ERROR diff file not found: {path}", file=sys.stderr)
|
|
sys.exit(3)
|
|
except json.JSONDecodeError as e:
|
|
print(f"ERROR diff file malformed: {path}: {e}", file=sys.stderr)
|
|
sys.exit(3)
|
|
agg: dict[tuple[str, str], dict] = defaultdict(
|
|
lambda: {
|
|
"tp": 0,
|
|
"fp": 0,
|
|
"fn": 0,
|
|
"unsupported": 0,
|
|
"confirmed": 0,
|
|
"partially_confirmed": 0,
|
|
"wrong_confirmed": 0,
|
|
"stable_replays": 0,
|
|
"confirmed_tp": 0,
|
|
"confirmed_fp": 0,
|
|
"total": 0,
|
|
}
|
|
)
|
|
for r in data:
|
|
for c in r.get("cells", []):
|
|
k = (c["cap"], c["lang"])
|
|
for field in (
|
|
"tp",
|
|
"fp",
|
|
"fn",
|
|
"unsupported",
|
|
"confirmed",
|
|
"partially_confirmed",
|
|
"wrong_confirmed",
|
|
"stable_replays",
|
|
"confirmed_tp",
|
|
"confirmed_fp",
|
|
"total",
|
|
):
|
|
agg[k][field] += c.get(field, 0)
|
|
return agg
|
|
|
|
|
|
def main() -> int:
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("--results", required=True)
|
|
p.add_argument(
|
|
"--budget",
|
|
default="",
|
|
help="path to budget.toml (per-(cap,lang) thresholds)",
|
|
)
|
|
p.add_argument(
|
|
"--diff",
|
|
default="",
|
|
help="path to a previous results.json; fail on monotonic-improvement regression",
|
|
)
|
|
p.add_argument(
|
|
"--min-confirmed-rate",
|
|
type=float,
|
|
default=None,
|
|
help=(
|
|
"minimum Confirmed / total rate per cap; exits 2 when any cap "
|
|
"with findings falls below the threshold"
|
|
),
|
|
)
|
|
p.add_argument(
|
|
"--min-precision",
|
|
type=float,
|
|
default=None,
|
|
help=(
|
|
"minimum precision (tp / (tp+fp)) per cap; exits 2 when any cap "
|
|
"with at least one finding falls below the threshold. Phase 27 "
|
|
"OWASP acceptance floor (>= 0.85)."
|
|
),
|
|
)
|
|
p.add_argument(
|
|
"--min-recall",
|
|
type=float,
|
|
default=None,
|
|
help=(
|
|
"minimum recall (tp / (tp+fn)) per cap; exits 2 when any cap "
|
|
"with at least one ground-truth positive falls below the "
|
|
"threshold. Phase 27 OWASP acceptance floor (>= 0.40)."
|
|
),
|
|
)
|
|
p.add_argument(
|
|
"--floor-caps",
|
|
default="",
|
|
help=(
|
|
"comma-separated cap allowlist. When set, the --min-confirmed-rate, "
|
|
"--min-precision and --min-recall floors are ENFORCED only for these "
|
|
"caps; other caps are still measured and printed but not gated. Used "
|
|
"to exempt caps with no sound runtime oracle (e.g. crypto weak "
|
|
"randomness, secure-cookie config smells) from dynamic-confirmation "
|
|
"floors that they fundamentally cannot meet. Empty = gate every cap."
|
|
),
|
|
)
|
|
args = p.parse_args()
|
|
floor_caps = {c.strip() for c in args.floor_caps.split(",") if c.strip()}
|
|
|
|
with open(args.results) as f:
|
|
results = json.load(f)
|
|
|
|
if not results:
|
|
print("No results to report.")
|
|
return 0
|
|
|
|
# Aggregate across sets.
|
|
agg: dict[tuple[str, str], dict] = defaultdict(
|
|
lambda: {
|
|
"tp": 0,
|
|
"fp": 0,
|
|
"fn": 0,
|
|
"unsupported": 0,
|
|
"confirmed": 0,
|
|
"partially_confirmed": 0,
|
|
"wrong_confirmed": 0,
|
|
"stable_replays": 0,
|
|
"confirmed_tp": 0,
|
|
"confirmed_fp": 0,
|
|
"total": 0,
|
|
}
|
|
)
|
|
for r in results:
|
|
for c in r.get("cells", []):
|
|
k = (c["cap"], c["lang"])
|
|
for field in (
|
|
"tp",
|
|
"fp",
|
|
"fn",
|
|
"unsupported",
|
|
"confirmed",
|
|
"partially_confirmed",
|
|
"wrong_confirmed",
|
|
"stable_replays",
|
|
"confirmed_tp",
|
|
"confirmed_fp",
|
|
"total",
|
|
):
|
|
agg[k][field] += c.get(field, 0)
|
|
|
|
print("\n=== Aggregated eval corpus report ===")
|
|
print(
|
|
f"{'Cap':<20} {'Lang':<12} {'TP':>5} {'FP':>5} {'FN':>5} "
|
|
f"{'Prec':>6} {'Rec':>6} {'Unsup%':>7} {'Conf%':>7} {'Part%':>7}"
|
|
)
|
|
print("-" * 88)
|
|
for k, v in sorted(agg.items()):
|
|
prec = v["tp"] / max(v["tp"] + v["fp"], 1)
|
|
rec = v["tp"] / max(v["tp"] + v["fn"], 1)
|
|
unsup = v["unsupported"] / max(v["total"], 1)
|
|
conf = v["confirmed"] / max(v["total"], 1)
|
|
part = v["partially_confirmed"] / max(v["total"], 1)
|
|
print(
|
|
f"{k[0]:<20} {k[1]:<12} "
|
|
f"{v['tp']:>5} {v['fp']:>5} {v['fn']:>5} "
|
|
f"{prec:>6.2f} {rec:>6.2f} "
|
|
f"{unsup*100:>6.1f}% {conf*100:>6.1f}% {part*100:>6.1f}%"
|
|
)
|
|
|
|
gate_failed = False
|
|
|
|
# ── Phase 29: per-cell budget enforcement ────────────────────────────
|
|
if args.budget:
|
|
budget = load_budget(args.budget)
|
|
print(f"\n=== Per-cell budget ({args.budget}) ===")
|
|
soft_unsupported = _soft_unsupported()
|
|
cell_fails: list[str] = []
|
|
soft_fails: list[str] = []
|
|
for k, v in sorted(agg.items()):
|
|
b = budget_for_cell(budget, k[0], k[1])
|
|
if not b:
|
|
continue
|
|
max_unsup = b.get("unsupported_rate")
|
|
max_false = b.get("false_confirmed_rate")
|
|
min_stable = b.get("repro_stability")
|
|
min_confirmed = b.get("confirmed_rate")
|
|
|
|
if isinstance(max_unsup, (int, float)) and v["total"] > 0:
|
|
rate = v["unsupported"] / v["total"]
|
|
if rate > max_unsup:
|
|
msg = (
|
|
f"{k[0]}/{k[1]}: Unsupported {rate*100:.1f}%"
|
|
f" > budget {max_unsup*100:.1f}%"
|
|
)
|
|
if k[0] in NO_SOUND_ORACLE_CAPS or soft_unsupported:
|
|
soft_fails.append(f" soft {msg}")
|
|
else:
|
|
cell_fails.append(f" FAIL {msg}")
|
|
if isinstance(max_false, (int, float)) and v["confirmed"] > 0:
|
|
rate = v["wrong_confirmed"] / v["confirmed"]
|
|
if rate > max_false:
|
|
cell_fails.append(
|
|
f" FAIL {k[0]}/{k[1]}: false-Confirmed {rate*100:.1f}%"
|
|
f" > budget {max_false*100:.1f}%"
|
|
)
|
|
if (
|
|
isinstance(min_stable, (int, float))
|
|
and v["confirmed"] > 0
|
|
and v.get("stable_replays", 0) > 0
|
|
):
|
|
rate = v["stable_replays"] / v["confirmed"]
|
|
if rate < min_stable:
|
|
cell_fails.append(
|
|
f" FAIL {k[0]}/{k[1]}: repro stability {rate*100:.1f}%"
|
|
f" < budget {min_stable*100:.1f}%"
|
|
)
|
|
if isinstance(min_confirmed, (int, float)) and v["total"] > 0:
|
|
rate = v["confirmed"] / v["total"]
|
|
if rate < min_confirmed:
|
|
cell_fails.append(
|
|
f" FAIL {k[0]}/{k[1]}: Confirmed {rate*100:.1f}%"
|
|
f" < budget {min_confirmed*100:.1f}%"
|
|
)
|
|
if soft_fails:
|
|
print(
|
|
" Unsupported-rate over budget (report-only: no-sound-oracle "
|
|
"cap or environment-constrained dynamic confirmation):"
|
|
)
|
|
for line in soft_fails:
|
|
print(line)
|
|
if cell_fails:
|
|
for line in cell_fails:
|
|
print(line)
|
|
gate_failed = True
|
|
else:
|
|
print(" All hard per-cell budgets met.")
|
|
else:
|
|
# Legacy fallback: per-cap Unsupported rate <= 80%.
|
|
print("\n=== Gate checks ===")
|
|
UNSUPPORTED_BUDGET = 0.80
|
|
cell_fails: list[str] = []
|
|
for k, v in sorted(agg.items()):
|
|
unsup = v["unsupported"] / max(v["total"], 1)
|
|
if unsup > UNSUPPORTED_BUDGET:
|
|
cell_fails.append(
|
|
f" FAIL {k[0]}/{k[1]}: Unsupported {unsup*100:.1f}%"
|
|
f" > {UNSUPPORTED_BUDGET*100:.0f}% budget"
|
|
)
|
|
if cell_fails:
|
|
for line in cell_fails:
|
|
print(line)
|
|
gate_failed = True
|
|
else:
|
|
print(" All gate thresholds met.")
|
|
|
|
# ── Per-cap Confirmed-rate (published always; gated when a floor given) ──
|
|
# Aggregated per cap across languages. The table is always printed so the
|
|
# corpus's confirmation profile is visible ("publish per-cap …"); the floor
|
|
# only FAILS the run when --min-confirmed-rate is supplied and the cap is in
|
|
# scope (floor_caps empty = every cap in scope).
|
|
cap_totals: dict[str, dict] = defaultdict(lambda: {"confirmed": 0, "total": 0})
|
|
for (cap, _lang), v in agg.items():
|
|
cap_totals[cap]["confirmed"] += v.get("confirmed", 0)
|
|
cap_totals[cap]["total"] += v.get("total", 0)
|
|
if cap_totals:
|
|
floor_txt = (
|
|
f" (floor {args.min_confirmed_rate*100:.1f}%)"
|
|
if args.min_confirmed_rate is not None
|
|
else " (report-only)"
|
|
)
|
|
print(f"\n=== Per-cap Confirmed-rate{floor_txt} ===")
|
|
confirmed_fails: list[str] = []
|
|
for cap, v in sorted(cap_totals.items()):
|
|
if v["total"] <= 0:
|
|
continue
|
|
rate = v["confirmed"] / v["total"]
|
|
gated = args.min_confirmed_rate is not None and (
|
|
(not floor_caps) or (cap in floor_caps)
|
|
)
|
|
line = (
|
|
f" {cap:<20} {v['confirmed']:>5}/{v['total']:<5} "
|
|
f"{rate*100:>6.1f}%"
|
|
)
|
|
if gated and rate < args.min_confirmed_rate:
|
|
confirmed_fails.append(f"{line} FAIL")
|
|
elif args.min_confirmed_rate is None:
|
|
print(line)
|
|
else:
|
|
print(f"{line} {'OK' if gated else 'skip (no floor)'}")
|
|
if confirmed_fails:
|
|
for line in confirmed_fails:
|
|
print(line)
|
|
gate_failed = True
|
|
elif args.min_confirmed_rate is not None:
|
|
print(" All confirmed-rate floors met.")
|
|
|
|
# ── Per-cap precision / recall (published always; gated when a floor given) ──
|
|
# OWASP acceptance: per-cap precision ≥ 0.85, recall ≥ 0.40. Aggregated per
|
|
# cap across languages (tp/fp/fn summed over every lang cell). The table is
|
|
# always printed ("publish per-cap precision/recall"); a cap FAILS only when
|
|
# the matching --min-* floor is supplied and the cap is in scope (floor_caps
|
|
# empty = every cap in scope).
|
|
cap_pr: dict[str, dict] = defaultdict(lambda: {"tp": 0, "fp": 0, "fn": 0})
|
|
for (cap, _lang), v in agg.items():
|
|
cap_pr[cap]["tp"] += v.get("tp", 0)
|
|
cap_pr[cap]["fp"] += v.get("fp", 0)
|
|
cap_pr[cap]["fn"] += v.get("fn", 0)
|
|
if cap_pr:
|
|
floors = []
|
|
if args.min_precision is not None:
|
|
floors.append(f"precision ≥ {args.min_precision*100:.1f}%")
|
|
if args.min_recall is not None:
|
|
floors.append(f"recall ≥ {args.min_recall*100:.1f}%")
|
|
floor_txt = f" (floors: {', '.join(floors)})" if floors else " (report-only)"
|
|
print(f"\n=== Per-cap precision/recall{floor_txt} ===")
|
|
print(f" {'Cap':<20} {'TP':>5} {'FP':>5} {'FN':>5} {'Prec':>7} {'Rec':>7} Status")
|
|
pr_failed = False
|
|
any_gated = False
|
|
for cap, v in sorted(cap_pr.items()):
|
|
tp, fp, fn = v["tp"], v["fp"], v["fn"]
|
|
# No findings and no GT positives → cap not present in this corpus.
|
|
if tp + fp + fn == 0:
|
|
continue
|
|
prec = tp / max(tp + fp, 1)
|
|
rec = tp / max(tp + fn, 1)
|
|
gated = (not floor_caps) or (cap in floor_caps)
|
|
tags = []
|
|
if gated and args.min_precision is not None and (tp + fp) > 0 and prec < args.min_precision:
|
|
tags.append("PRECISION")
|
|
if gated and args.min_recall is not None and (tp + fn) > 0 and rec < args.min_recall:
|
|
tags.append("RECALL")
|
|
if tags:
|
|
status = "FAIL " + "+".join(tags)
|
|
elif not floors:
|
|
status = "—"
|
|
elif gated:
|
|
status = "OK"
|
|
any_gated = True
|
|
else:
|
|
status = "skip (no floor)"
|
|
print(
|
|
f" {cap:<20} {tp:>5} {fp:>5} {fn:>5} "
|
|
f"{prec:>7.2f} {rec:>7.2f} {status}"
|
|
)
|
|
if tags:
|
|
pr_failed = True
|
|
if pr_failed:
|
|
gate_failed = True
|
|
elif floors and any_gated:
|
|
print(" All per-cap precision/recall floors met.")
|
|
|
|
# ── Phase 29: monotonic-improvement diff ─────────────────────────────
|
|
if args.diff:
|
|
prev = load_previous_agg(args.diff)
|
|
print(f"\n=== Monotonic-improvement diff vs {args.diff} ===")
|
|
diff_fails: list[str] = []
|
|
EPS = 0.005
|
|
for k, v in sorted(agg.items()):
|
|
old = prev.get(k)
|
|
if not old:
|
|
continue
|
|
old_unsup = old["unsupported"] / max(old["total"], 1)
|
|
new_unsup = v["unsupported"] / max(v["total"], 1)
|
|
if new_unsup > old_unsup + EPS:
|
|
diff_fails.append(
|
|
f" REGRESSION {k[0]}/{k[1]}: Unsupported"
|
|
f" {old_unsup*100:.1f}% → {new_unsup*100:.1f}%"
|
|
)
|
|
old_conf = old.get("confirmed", 0)
|
|
new_conf = v.get("confirmed", 0)
|
|
old_false = (old.get("wrong_confirmed", 0) / old_conf) if old_conf else None
|
|
new_false = (v.get("wrong_confirmed", 0) / new_conf) if new_conf else None
|
|
if old_false is not None and new_false is not None and new_false > old_false + EPS:
|
|
diff_fails.append(
|
|
f" REGRESSION {k[0]}/{k[1]}: false-Confirmed"
|
|
f" {old_false*100:.1f}% → {new_false*100:.1f}%"
|
|
)
|
|
old_stable = (old.get("stable_replays", 0) / old_conf) if old_conf else None
|
|
new_stable = (v.get("stable_replays", 0) / new_conf) if new_conf else None
|
|
if (
|
|
old_stable is not None
|
|
and new_stable is not None
|
|
and new_stable < old_stable - EPS
|
|
):
|
|
diff_fails.append(
|
|
f" REGRESSION {k[0]}/{k[1]}: repro stability"
|
|
f" {old_stable*100:.1f}% → {new_stable*100:.1f}%"
|
|
)
|
|
if diff_fails:
|
|
for line in diff_fails:
|
|
print(line)
|
|
gate_failed = True
|
|
else:
|
|
print(" No regressions vs previous run.")
|
|
|
|
return 2 if gate_failed else 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|