nyx/tests/chain_emission_e2e.rs
2026-06-05 10:16:30 -05:00

332 lines
12 KiB
Rust

//! End-to-end chain-composer regression test.
//!
//! Drives the built `nyx` binary against fixture projects crafted to
//! exercise the chain composer and asserts the JSON output carries at
//! least one entry in the top-level `chains` array. Complements the
//! synthetic-input integration tests under `tests/chain_emission.rs` and
//! `tests/chain_reverify.rs` (which drive `find_chains` / `compose_chain`
//! directly) by closing the wire-format loop: a chain that drops out of
//! `find_chains` must still land in the scan command's output.
//!
//! Fixture acceptance contract (one per language under
//! `tests/dynamic_fixtures/chain_composer/<lang>/<scenario>/`):
//!
//! - The scanner must produce at least one `findings[]` entry.
//! - The scanner must produce at least one `chains[]` entry.
//! - The top chain's `severity` must be `critical` or `high`.
//! - The top chain's `members` array must be non-empty.
//!
//! New scenarios drop their root directory into [`SCENARIOS`] below.
use assert_cmd::Command;
use serde_json::Value;
use std::path::Path;
use std::path::PathBuf;
struct Scenario {
/// Path relative to `tests/dynamic_fixtures/chain_composer/`.
rel_path: &'static str,
/// Required `implied_impact` value on at least one emitted chain.
/// `None` skips the impact assertion (kept as an escape hatch for
/// future scenarios where the lattice match is intentionally a
/// different category).
required_impact: Option<&'static str>,
}
const SCENARIOS: &[Scenario] = &[Scenario {
rel_path: "python/flask_eval",
required_impact: Some("rce"),
}];
fn fixture_root(rel: &str) -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/dynamic_fixtures/chain_composer")
.join(rel)
}
fn nyx_scan_cmd(home: &Path, root: &Path) -> Command {
let mut cmd = Command::cargo_bin("nyx").expect("nyx binary");
cmd.env("HOME", home)
.env("XDG_CONFIG_HOME", home.join(".config"))
.env("XDG_DATA_HOME", home.join(".local/share"))
.env("NO_COLOR", "1")
.args(["scan", "--format", "json"])
.arg(root);
cmd
}
fn run_scan_json(root: &Path) -> Value {
let home = tempfile::tempdir().expect("temp home");
let assert = nyx_scan_cmd(home.path(), root).assert().success();
let stdout = String::from_utf8(assert.get_output().stdout.clone())
.expect("nyx scan stdout is valid UTF-8");
serde_json::from_str(&stdout).unwrap_or_else(|e| {
panic!(
"nyx scan --format json produced invalid JSON for {}: {e}\n--- stdout ---\n{}\n",
root.display(),
stdout
)
})
}
#[test]
fn every_chain_composer_scenario_emits_at_least_one_chain() {
assert!(
!SCENARIOS.is_empty(),
"SCENARIOS table must list at least one fixture"
);
for scenario in SCENARIOS {
let root = fixture_root(scenario.rel_path);
assert!(
root.is_dir(),
"fixture root missing for scenario {}: {}",
scenario.rel_path,
root.display()
);
let value = run_scan_json(&root);
let findings = value
.get("findings")
.and_then(Value::as_array)
.unwrap_or_else(|| {
panic!(
"scenario {}: `findings` array missing from scan output",
scenario.rel_path
)
});
assert!(
!findings.is_empty(),
"scenario {}: expected at least one finding, got 0. Scan output:\n{}",
scenario.rel_path,
serde_json::to_string_pretty(&value).unwrap_or_default()
);
let chains = value
.get("chains")
.and_then(Value::as_array)
.unwrap_or_else(|| {
panic!(
"scenario {}: `chains` array missing from scan output",
scenario.rel_path
)
});
assert!(
!chains.is_empty(),
"scenario {}: expected at least one composed chain, got 0. \
Scan output:\n{}",
scenario.rel_path,
serde_json::to_string_pretty(&value).unwrap_or_default()
);
let top = &chains[0];
let severity = top
.get("severity")
.and_then(Value::as_str)
.unwrap_or("<missing>");
assert!(
matches!(severity, "critical" | "high"),
"scenario {}: top chain severity must be critical or high, \
got {severity:?}. Chain:\n{}",
scenario.rel_path,
serde_json::to_string_pretty(top).unwrap_or_default()
);
let members = top
.get("members")
.and_then(Value::as_array)
.unwrap_or_else(|| {
panic!(
"scenario {}: top chain has no `members` array",
scenario.rel_path
)
});
assert!(
!members.is_empty(),
"scenario {}: top chain must have at least one member",
scenario.rel_path
);
if let Some(expected) = scenario.required_impact {
let any_match = chains.iter().any(|c| {
c.get("implied_impact")
.and_then(Value::as_str)
.is_some_and(|v| v == expected)
});
assert!(
any_match,
"scenario {}: no chain carried implied_impact={expected:?}. \
Chains:\n{}",
scenario.rel_path,
serde_json::to_string_pretty(chains).unwrap_or_default()
);
}
}
}
/// Locks the scan-pipeline wiring contract: when dynamic verification is
/// enabled (default), the composite chain re-verifier runs after the
/// chain-composition pass and stamps each top-N chain's
/// `dynamic_verdict` so downstream consumers (`build_findings_json`,
/// `build_sarif_with_chains`, console renderer) see a populated field.
///
/// The verdict's *status* depends on the host's Python toolchain: when
/// `python3 -m venv` succeeds and the per-language chain-step harness
/// runs, the verdict resolves to `Confirmed`; when the toolchain is
/// missing it falls through to `Inconclusive(BackendInsufficient)`.
/// This test asserts only the wiring contract — that the field is
/// populated and the detail string reports coverage — so it stays green
/// on any host with a working `nyx` binary.
///
/// Gated on `feature = "dynamic"` because the reverifier lives behind
/// that flag.
#[cfg(feature = "dynamic")]
#[test]
fn flask_eval_chain_reverify_populates_dynamic_verdict() {
let root = fixture_root("python/flask_eval");
let value = run_scan_json(&root);
let chains = value
.get("chains")
.and_then(Value::as_array)
.expect("`chains` array missing from scan output");
assert!(!chains.is_empty(), "expected at least one composed chain");
let top = &chains[0];
let dv = top
.get("dynamic_verdict")
.expect("`dynamic_verdict` key missing from top chain");
assert!(
!dv.is_null(),
"top chain `dynamic_verdict` was null; wiring did not fire. Chain:\n{}",
serde_json::to_string_pretty(top).unwrap_or_default()
);
let status = dv
.get("status")
.and_then(Value::as_str)
.expect("verdict missing `status`");
assert!(
matches!(status, "Confirmed" | "Inconclusive" | "Unsupported"),
"unexpected verdict status: {status:?}"
);
let detail = dv
.get("detail")
.and_then(Value::as_str)
.expect("verdict missing `detail`");
for segment in ["derived", "built", "ran"] {
assert!(
detail.contains(segment),
"verdict detail missing `{segment}` coverage segment: {detail:?}"
);
}
}
/// Locks the Phase 31 telemetry stability stamping contract: when
/// `NYX_VERIFY_REPLAY_STABLE=1` is set and the chain reverifier resolves
/// to `Confirmed`, the verdict's `replay_stable` field is populated.
/// Without the env var, `replay_stable` stays `null`.
///
/// Status-agnostic: when the host's Python toolchain is missing the
/// reverifier never reaches its `Confirmed` branch and `replay_stable`
/// stays `null` in both arms — the test then asserts only the absence-
/// path contract under both env-var settings so it stays green on
/// toolchain-free hosts. When `Confirmed` *does* fire, the env-var-set
/// arm must carry `Some(true|false)`.
#[cfg(feature = "dynamic")]
#[test]
fn flask_eval_chain_replay_stable_honours_opt_in() {
let root = fixture_root("python/flask_eval");
// Arm 1: env var unset → replay_stable must be null on the top chain
// regardless of verdict status.
let home_off = tempfile::tempdir().expect("temp home");
let assert_off = nyx_scan_cmd(home_off.path(), &root)
.env_remove("NYX_VERIFY_REPLAY_STABLE")
.assert()
.success();
let value_off: Value = serde_json::from_slice(&assert_off.get_output().stdout)
.expect("nyx scan --format json produced invalid JSON (arm off)");
let top_off = value_off
.get("chains")
.and_then(Value::as_array)
.and_then(|c| c.first())
.expect("expected at least one composed chain (arm off)");
let dv_off = top_off
.get("dynamic_verdict")
.expect("dynamic_verdict missing (arm off)");
let replay_off = dv_off.get("replay_stable");
assert!(
matches!(replay_off, None | Some(Value::Null)),
"replay_stable should be absent or null when opt-in is off; got {replay_off:?}"
);
// Arm 2: env var set → replay_stable must be populated when the
// verdict is Confirmed. When the toolchain is missing the verdict
// stays Inconclusive and replay_stable stays null; both branches
// are valid wiring outcomes.
let home_on = tempfile::tempdir().expect("temp home");
let assert_on = nyx_scan_cmd(home_on.path(), &root)
.env("NYX_VERIFY_REPLAY_STABLE", "1")
.assert()
.success();
let value_on: Value = serde_json::from_slice(&assert_on.get_output().stdout)
.expect("nyx scan --format json produced invalid JSON (arm on)");
let top_on = value_on
.get("chains")
.and_then(Value::as_array)
.and_then(|c| c.first())
.expect("expected at least one composed chain (arm on)");
let dv_on = top_on
.get("dynamic_verdict")
.expect("dynamic_verdict missing (arm on)");
let status_on = dv_on
.get("status")
.and_then(Value::as_str)
.expect("verdict missing status (arm on)");
let replay_on = dv_on.get("replay_stable");
if status_on == "Confirmed" {
assert!(
matches!(replay_on, Some(Value::Bool(_))),
"replay_stable must be populated when opt-in is on and verdict is Confirmed; got {replay_on:?}"
);
} else {
assert!(
matches!(replay_on, None | Some(Value::Null) | Some(Value::Bool(_))),
"replay_stable should be absent, null, or a bool; got {replay_on:?}"
);
}
}
/// Mirror of the above: with `--no-verify` the chain-reverify pass is
/// skipped and `dynamic_verdict` stays `null`. Locks the cost-control
/// contract: users who opt out of dynamic verification do not pay the
/// per-chain build / sandbox cost.
#[cfg(feature = "dynamic")]
#[test]
fn flask_eval_chain_dynamic_verdict_is_null_when_verify_disabled() {
let root = fixture_root("python/flask_eval");
let home = tempfile::tempdir().expect("temp home");
let assert = nyx_scan_cmd(home.path(), &root)
.arg("--no-verify")
.assert()
.success();
let stdout = String::from_utf8(assert.get_output().stdout.clone())
.expect("nyx scan stdout is valid UTF-8");
let value: Value =
serde_json::from_str(&stdout).expect("nyx scan --format json produced invalid JSON");
let chains = value
.get("chains")
.and_then(Value::as_array)
.expect("`chains` array missing");
assert!(!chains.is_empty());
let top = &chains[0];
let dv = top.get("dynamic_verdict");
assert!(
matches!(dv, None | Some(Value::Null)),
"top chain `dynamic_verdict` should be absent or null under --no-verify; got {dv:?}"
);
}