[pitboss] phase 12: Track B — Python harness emitter shapes

This commit is contained in:
pitboss 2026-05-14 15:30:12 -05:00
parent 523bd0c53a
commit 96eb37500c
29 changed files with 3394 additions and 122 deletions

View file

@ -14,12 +14,15 @@ mod common;
#[cfg(feature = "dynamic")]
mod python_fixture_tests {
use crate::common::fixture_harness::{
run_fixture_and_compare_to_golden, CopyStrategy, FixtureSpec,
run_fixture_and_compare_to_golden, run_harness_snapshot, run_shape_fixture,
CopyStrategy, FixtureSpec,
};
use nyx_scanner::commands::scan::Diag;
use nyx_scanner::dynamic::spec::PayloadSlot;
use nyx_scanner::dynamic::verify::{verify_finding, VerifyOptions};
use nyx_scanner::evidence::{
Confidence, Evidence, FlowStep, FlowStepKind, UnsupportedReason, VerifyStatus,
Confidence, EntryKind, Evidence, FlowStep, FlowStepKind, UnsupportedReason,
VerifyStatus,
};
use nyx_scanner::labels::Cap;
use nyx_scanner::patterns::{FindingCategory, Severity};
@ -275,6 +278,328 @@ mod python_fixture_tests {
}
}
// ── Phase 12 — per-shape acceptance ──────────────────────────────────────
//
// For each shape the suite asserts:
// 1. The vuln fixture confirms (oracle fires, sink hit).
// 2. The benign fixture does NOT confirm.
// 3. The emitted harness source matches the per-shape golden
// snapshot under `tests/dynamic_fixtures/python/<shape>/`.
//
// Framework-bound shapes (Flask / FastAPI / Django / Celery) skip
// with an `eprintln!` when the framework is unimportable in the
// host's `python3` (and therefore unavailable to the harness's
// built venv without a successful pip install).
fn python_module_available(module: &'static str) -> bool {
std::process::Command::new("python3")
.arg("-c")
.arg(format!("import {module}"))
.output()
.map(|o| o.status.success())
.unwrap_or(false)
}
fn assert_confirmed(shape: &str, result: &nyx_scanner::evidence::VerifyResult) {
assert_eq!(
result.status,
VerifyStatus::Confirmed,
"{shape}/vuln.py: expected Confirmed, got {:?} ({:?})",
result.status,
result.detail,
);
}
fn assert_not_confirmed(shape: &str, result: &nyx_scanner::evidence::VerifyResult) {
assert!(
matches!(
result.status,
VerifyStatus::NotConfirmed | VerifyStatus::Inconclusive
),
"{shape}/benign.py: expected NotConfirmed (or Inconclusive), got {:?} ({:?})",
result.status,
result.detail,
);
// Tighter check: a benign fixture must never light up `Confirmed`.
assert_ne!(
result.status,
VerifyStatus::Confirmed,
"{shape}/benign.py: must not confirm",
);
}
// ── generic ─────────────────────────────────────────────────────────────
#[test]
fn generic_vuln_is_confirmed() {
if !python3_available() { eprintln!("SKIP: python3 not available"); return; }
let r = run_shape_fixture(
"generic", "vuln.py", "run_ping", Cap::CODE_EXEC, 12,
EntryKind::Function, PayloadSlot::Param(0),
);
assert_confirmed("generic", &r);
}
#[test]
fn generic_benign_not_confirmed() {
if !python3_available() { eprintln!("SKIP: python3 not available"); return; }
let r = run_shape_fixture(
"generic", "benign.py", "run_ping", Cap::CODE_EXEC, 20,
EntryKind::Function, PayloadSlot::Param(0),
);
assert_not_confirmed("generic", &r);
}
#[test]
fn generic_harness_snapshot_matches_golden() {
run_harness_snapshot(
"generic", "vuln.py", "run_ping", Cap::CODE_EXEC, 12,
EntryKind::Function, PayloadSlot::Param(0),
);
}
// ── cli ─────────────────────────────────────────────────────────────────
#[test]
fn cli_vuln_is_confirmed() {
if !python3_available() { eprintln!("SKIP: python3 not available"); return; }
let r = run_shape_fixture(
"cli", "vuln.py", "main", Cap::CODE_EXEC, 14,
EntryKind::CliSubcommand, PayloadSlot::Argv(0),
);
assert_confirmed("cli", &r);
}
#[test]
fn cli_benign_not_confirmed() {
if !python3_available() { eprintln!("SKIP: python3 not available"); return; }
let r = run_shape_fixture(
"cli", "benign.py", "main", Cap::CODE_EXEC, 11,
EntryKind::CliSubcommand, PayloadSlot::Argv(0),
);
assert_not_confirmed("cli", &r);
}
#[test]
fn cli_harness_snapshot_matches_golden() {
run_harness_snapshot(
"cli", "vuln.py", "main", Cap::CODE_EXEC, 14,
EntryKind::CliSubcommand, PayloadSlot::Argv(0),
);
}
// ── pytest ──────────────────────────────────────────────────────────────
#[test]
fn pytest_vuln_is_confirmed() {
if !python3_available() { eprintln!("SKIP: python3 not available"); return; }
let r = run_shape_fixture(
"pytest", "vuln.py", "test_run_ping", Cap::CODE_EXEC, 14,
EntryKind::Function, PayloadSlot::EnvVar("NYX_PAYLOAD".into()),
);
assert_confirmed("pytest", &r);
}
#[test]
fn pytest_benign_not_confirmed() {
if !python3_available() { eprintln!("SKIP: python3 not available"); return; }
let r = run_shape_fixture(
"pytest", "benign.py", "test_run_ping", Cap::CODE_EXEC, 14,
EntryKind::Function, PayloadSlot::EnvVar("NYX_PAYLOAD".into()),
);
assert_not_confirmed("pytest", &r);
}
#[test]
fn pytest_harness_snapshot_matches_golden() {
run_harness_snapshot(
"pytest", "vuln.py", "test_run_ping", Cap::CODE_EXEC, 14,
EntryKind::Function, PayloadSlot::EnvVar("NYX_PAYLOAD".into()),
);
}
// ── async ───────────────────────────────────────────────────────────────
#[test]
fn async_vuln_is_confirmed() {
if !python3_available() { eprintln!("SKIP: python3 not available"); return; }
let r = run_shape_fixture(
"async", "vuln.py", "run_ping", Cap::CODE_EXEC, 13,
EntryKind::Function, PayloadSlot::Param(0),
);
assert_confirmed("async", &r);
}
#[test]
fn async_benign_not_confirmed() {
if !python3_available() { eprintln!("SKIP: python3 not available"); return; }
let r = run_shape_fixture(
"async", "benign.py", "run_ping", Cap::CODE_EXEC, 14,
EntryKind::Function, PayloadSlot::Param(0),
);
assert_not_confirmed("async", &r);
}
#[test]
fn async_harness_snapshot_matches_golden() {
run_harness_snapshot(
"async", "vuln.py", "run_ping", Cap::CODE_EXEC, 13,
EntryKind::Function, PayloadSlot::Param(0),
);
}
// ── celery ──────────────────────────────────────────────────────────────
#[test]
fn celery_vuln_is_confirmed() {
if !python3_available() { eprintln!("SKIP: python3 not available"); return; }
if !python_module_available("celery") {
eprintln!("SKIP: celery not importable");
return;
}
let r = run_shape_fixture(
"celery", "vuln.py", "run_job", Cap::CODE_EXEC, 17,
EntryKind::Function, PayloadSlot::Param(0),
);
assert_confirmed("celery", &r);
}
#[test]
fn celery_benign_not_confirmed() {
if !python3_available() { eprintln!("SKIP: python3 not available"); return; }
if !python_module_available("celery") {
eprintln!("SKIP: celery not importable");
return;
}
let r = run_shape_fixture(
"celery", "benign.py", "run_job", Cap::CODE_EXEC, 17,
EntryKind::Function, PayloadSlot::Param(0),
);
assert_not_confirmed("celery", &r);
}
#[test]
fn celery_harness_snapshot_matches_golden() {
run_harness_snapshot(
"celery", "vuln.py", "run_job", Cap::CODE_EXEC, 17,
EntryKind::Function, PayloadSlot::Param(0),
);
}
// ── flask ───────────────────────────────────────────────────────────────
#[test]
fn flask_vuln_is_confirmed() {
if !python3_available() { eprintln!("SKIP: python3 not available"); return; }
if !python_module_available("flask") {
eprintln!("SKIP: flask not importable");
return;
}
let r = run_shape_fixture(
"flask", "vuln.py", "ping", Cap::CODE_EXEC, 18,
EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()),
);
assert_confirmed("flask", &r);
}
#[test]
fn flask_benign_not_confirmed() {
if !python3_available() { eprintln!("SKIP: python3 not available"); return; }
if !python_module_available("flask") {
eprintln!("SKIP: flask not importable");
return;
}
let r = run_shape_fixture(
"flask", "benign.py", "ping", Cap::CODE_EXEC, 17,
EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()),
);
assert_not_confirmed("flask", &r);
}
#[test]
fn flask_harness_snapshot_matches_golden() {
run_harness_snapshot(
"flask", "vuln.py", "ping", Cap::CODE_EXEC, 18,
EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()),
);
}
// ── fastapi ─────────────────────────────────────────────────────────────
#[test]
fn fastapi_vuln_is_confirmed() {
if !python3_available() { eprintln!("SKIP: python3 not available"); return; }
if !python_module_available("fastapi") {
eprintln!("SKIP: fastapi not importable");
return;
}
let r = run_shape_fixture(
"fastapi", "vuln.py", "ping", Cap::CODE_EXEC, 16,
EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()),
);
assert_confirmed("fastapi", &r);
}
#[test]
fn fastapi_benign_not_confirmed() {
if !python3_available() { eprintln!("SKIP: python3 not available"); return; }
if !python_module_available("fastapi") {
eprintln!("SKIP: fastapi not importable");
return;
}
let r = run_shape_fixture(
"fastapi", "benign.py", "ping", Cap::CODE_EXEC, 16,
EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()),
);
assert_not_confirmed("fastapi", &r);
}
#[test]
fn fastapi_harness_snapshot_matches_golden() {
run_harness_snapshot(
"fastapi", "vuln.py", "ping", Cap::CODE_EXEC, 16,
EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()),
);
}
// ── django ──────────────────────────────────────────────────────────────
#[test]
fn django_vuln_is_confirmed() {
if !python3_available() { eprintln!("SKIP: python3 not available"); return; }
if !python_module_available("django") {
eprintln!("SKIP: django not importable");
return;
}
let r = run_shape_fixture(
"django", "vuln.py", "ping", Cap::CODE_EXEC, 15,
EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()),
);
assert_confirmed("django", &r);
}
#[test]
fn django_benign_not_confirmed() {
if !python3_available() { eprintln!("SKIP: python3 not available"); return; }
if !python_module_available("django") {
eprintln!("SKIP: django not importable");
return;
}
let r = run_shape_fixture(
"django", "benign.py", "ping", Cap::CODE_EXEC, 14,
EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()),
);
assert_not_confirmed("django", &r);
}
#[test]
fn django_harness_snapshot_matches_golden() {
run_harness_snapshot(
"django", "vuln.py", "ping", Cap::CODE_EXEC, 15,
EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()),
);
}
/// Sensitive-filename gate fires before any harness execution; no
/// python3 needed.
#[test]