mirror of
https://github.com/elicpeter/nyx.git
synced 2026-06-21 20:18:06 +02:00
144 lines
5.1 KiB
Rust
144 lines
5.1 KiB
Rust
//! Phase 10 (Track D.3) — per-(lang, cap) stub end-to-end tests.
|
|
//!
|
|
//! These tests spin up a real boundary stub, splice the per-language
|
|
//! probe shim (which now carries the cap-specific
|
|
//! `__nyx_stub_*_record` helpers) ahead of a fixture's source, run the
|
|
//! resulting program with the stub's endpoint + recording-path env
|
|
//! vars set, then assert the stub captured the boundary event.
|
|
//!
|
|
//! Unlike `tests/stubs_per_cap.rs` (which synthesises harness
|
|
//! behaviour with host-side `SqlStub::record_query` calls), this suite
|
|
//! drives a real interpreter subprocess so the per-language shim
|
|
//! contract is exercised end-to-end. When the host is missing the
|
|
//! interpreter the test eprintln-skips, matching every other lang
|
|
//! fixture suite in-tree.
|
|
//!
|
|
//! Acceptance bullet from `.pitboss/play/deferred.md` Phase 10
|
|
//! follow-up: the Python+SQL pair is the cheapest first bite —
|
|
//! `sqlite3` is stdlib so no new toolchain dependency is required for
|
|
//! the dynamic CI matrix.
|
|
|
|
#![cfg(feature = "dynamic")]
|
|
|
|
use nyx_scanner::dynamic::lang::python::probe_shim as python_probe_shim;
|
|
use nyx_scanner::dynamic::stubs::{SqlStub, StubProvider};
|
|
use std::path::PathBuf;
|
|
use std::process::Command;
|
|
use tempfile::TempDir;
|
|
|
|
fn python3_available() -> bool {
|
|
Command::new("python3")
|
|
.arg("--version")
|
|
.output()
|
|
.map(|o| o.status.success())
|
|
.unwrap_or(false)
|
|
}
|
|
|
|
fn fixture_path(rel: &str) -> PathBuf {
|
|
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
|
|
.join("tests")
|
|
.join("dynamic_fixtures")
|
|
.join("stubs_e2e")
|
|
.join(rel)
|
|
}
|
|
|
|
#[test]
|
|
fn python_sql_stub_captures_tautology_query_via_shim_recorder() {
|
|
if !python3_available() {
|
|
eprintln!("SKIP: python3 not available");
|
|
return;
|
|
}
|
|
|
|
let workdir = TempDir::new().expect("tempdir");
|
|
let stub = SqlStub::start(workdir.path()).expect("SqlStub::start");
|
|
|
|
// The verifier publishes the SQLite DB path on `NYX_SQL_ENDPOINT`
|
|
// (primary) and the queries-log path on `NYX_SQL_LOG` (companion).
|
|
let endpoint = stub.endpoint();
|
|
let recording = stub
|
|
.recording_endpoint()
|
|
.expect("SqlStub must publish a recording endpoint");
|
|
|
|
// Splice the probe shim ahead of the fixture source so the
|
|
// generated program carries the `__nyx_stub_sql_record` helper.
|
|
// Mirrors the production `PythonEmitter::emit` ordering.
|
|
let fixture =
|
|
std::fs::read_to_string(fixture_path("python/sql/vuln/main.py")).expect("read fixture");
|
|
let mut combined = String::with_capacity(python_probe_shim().len() + fixture.len() + 64);
|
|
combined.push_str(python_probe_shim());
|
|
combined.push_str("\n# ── fixture begins ─\n");
|
|
combined.push_str(&fixture);
|
|
|
|
let script_path = workdir.path().join("driver.py");
|
|
std::fs::write(&script_path, combined).expect("write driver");
|
|
|
|
let output = Command::new("python3")
|
|
.arg(&script_path)
|
|
.env("NYX_SQL_ENDPOINT", &endpoint)
|
|
.env(recording.0, &recording.1)
|
|
.output()
|
|
.expect("python3 driver");
|
|
assert!(
|
|
output.status.success(),
|
|
"driver must exit 0; stderr = {}",
|
|
String::from_utf8_lossy(&output.stderr)
|
|
);
|
|
|
|
let events = stub.drain_events();
|
|
assert!(
|
|
!events.is_empty(),
|
|
"SqlStub must capture at least one event after the shim recorder fires"
|
|
);
|
|
let tautology = events
|
|
.iter()
|
|
.find(|e| e.summary.contains("OR 1=1"))
|
|
.expect("recorded query must contain the tautology marker");
|
|
assert_eq!(
|
|
tautology.detail.get("driver").map(String::as_str),
|
|
Some("sqlite3"),
|
|
"kwargs passed to __nyx_stub_sql_record must surface as event detail entries"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn python_sql_shim_recorder_is_noop_without_log_env() {
|
|
if !python3_available() {
|
|
eprintln!("SKIP: python3 not available");
|
|
return;
|
|
}
|
|
|
|
let workdir = TempDir::new().expect("tempdir");
|
|
let stub = SqlStub::start(workdir.path()).expect("SqlStub::start");
|
|
|
|
// Drive the same fixture but withhold NYX_SQL_LOG. The shim
|
|
// helper must be a no-op so the same source still runs cleanly
|
|
// under harness modes that didn't spawn a stub.
|
|
let endpoint = stub.endpoint();
|
|
let fixture =
|
|
std::fs::read_to_string(fixture_path("python/sql/vuln/main.py")).expect("read fixture");
|
|
let mut combined = String::new();
|
|
combined.push_str(python_probe_shim());
|
|
combined.push('\n');
|
|
combined.push_str(&fixture);
|
|
let script_path = workdir.path().join("driver_no_log.py");
|
|
std::fs::write(&script_path, combined).expect("write driver");
|
|
|
|
let output = Command::new("python3")
|
|
.arg(&script_path)
|
|
.env("NYX_SQL_ENDPOINT", &endpoint)
|
|
.env_remove("NYX_SQL_LOG")
|
|
.output()
|
|
.expect("python3 driver");
|
|
assert!(
|
|
output.status.success(),
|
|
"driver must exit 0 even without NYX_SQL_LOG; stderr = {}",
|
|
String::from_utf8_lossy(&output.stderr)
|
|
);
|
|
|
|
let events = stub.drain_events();
|
|
assert!(
|
|
events.is_empty(),
|
|
"no events expected when the recording env var is unset, got {} entries",
|
|
events.len()
|
|
);
|
|
}
|