From 96eb37500ce47025c0c34b6d0f0e67a475a05737 Mon Sep 17 00:00:00 2001 From: pitboss Date: Thu, 14 May 2026 15:30:12 -0500 Subject: [PATCH] =?UTF-8?q?[pitboss]=20phase=2012:=20Track=20B=20=E2=80=94?= =?UTF-8?q?=20Python=20harness=20emitter=20shapes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/dynamic/lang/python.rs | 948 +++++++++++++++--- tests/common/fixture_harness.rs | 236 ++++- tests/dynamic_fixtures/python/async/benign.py | 22 + tests/dynamic_fixtures/python/async/vuln.py | 21 + .../python/async/vuln.py.golden_harness.py | 180 ++++ .../dynamic_fixtures/python/celery/benign.py | 25 + tests/dynamic_fixtures/python/celery/vuln.py | 25 + .../python/celery/vuln.py.golden_harness.py | 183 ++++ tests/dynamic_fixtures/python/cli/benign.py | 26 + tests/dynamic_fixtures/python/cli/vuln.py | 26 + .../python/cli/vuln.py.golden_harness.py | 188 ++++ .../dynamic_fixtures/python/django/benign.py | 21 + tests/dynamic_fixtures/python/django/vuln.py | 22 + .../python/django/vuln.py.golden_harness.py | 228 +++++ .../dynamic_fixtures/python/fastapi/benign.py | 23 + tests/dynamic_fixtures/python/fastapi/vuln.py | 23 + .../python/fastapi/vuln.py.golden_harness.py | 234 +++++ tests/dynamic_fixtures/python/flask/benign.py | 24 + tests/dynamic_fixtures/python/flask/vuln.py | 25 + .../python/flask/vuln.py.golden_harness.py | 232 +++++ .../dynamic_fixtures/python/generic/benign.py | 28 + tests/dynamic_fixtures/python/generic/vuln.py | 20 + .../python/generic/vuln.py.golden_harness.py | 178 ++++ .../dynamic_fixtures/python/pytest/benign.py | 22 + tests/dynamic_fixtures/python/pytest/vuln.py | 22 + .../python/pytest/vuln.py.golden_harness.py | 181 ++++ .../spec_strategies/callgraph_entry_http.rs | 12 + tests/python_fixtures.rs | 329 +++++- tests/spec_derivation_strategies.rs | 12 +- 29 files changed, 3394 insertions(+), 122 deletions(-) create mode 100644 tests/dynamic_fixtures/python/async/benign.py create mode 100644 tests/dynamic_fixtures/python/async/vuln.py create mode 100644 tests/dynamic_fixtures/python/async/vuln.py.golden_harness.py create mode 100644 tests/dynamic_fixtures/python/celery/benign.py create mode 100644 tests/dynamic_fixtures/python/celery/vuln.py create mode 100644 tests/dynamic_fixtures/python/celery/vuln.py.golden_harness.py create mode 100644 tests/dynamic_fixtures/python/cli/benign.py create mode 100644 tests/dynamic_fixtures/python/cli/vuln.py create mode 100644 tests/dynamic_fixtures/python/cli/vuln.py.golden_harness.py create mode 100644 tests/dynamic_fixtures/python/django/benign.py create mode 100644 tests/dynamic_fixtures/python/django/vuln.py create mode 100644 tests/dynamic_fixtures/python/django/vuln.py.golden_harness.py create mode 100644 tests/dynamic_fixtures/python/fastapi/benign.py create mode 100644 tests/dynamic_fixtures/python/fastapi/vuln.py create mode 100644 tests/dynamic_fixtures/python/fastapi/vuln.py.golden_harness.py create mode 100644 tests/dynamic_fixtures/python/flask/benign.py create mode 100644 tests/dynamic_fixtures/python/flask/vuln.py create mode 100644 tests/dynamic_fixtures/python/flask/vuln.py.golden_harness.py create mode 100644 tests/dynamic_fixtures/python/generic/benign.py create mode 100644 tests/dynamic_fixtures/python/generic/vuln.py create mode 100644 tests/dynamic_fixtures/python/generic/vuln.py.golden_harness.py create mode 100644 tests/dynamic_fixtures/python/pytest/benign.py create mode 100644 tests/dynamic_fixtures/python/pytest/vuln.py create mode 100644 tests/dynamic_fixtures/python/pytest/vuln.py.golden_harness.py create mode 100644 tests/dynamic_fixtures/spec_strategies/callgraph_entry_http.rs diff --git a/src/dynamic/lang/python.rs b/src/dynamic/lang/python.rs index b358a82f..aa555b94 100644 --- a/src/dynamic/lang/python.rs +++ b/src/dynamic/lang/python.rs @@ -1,32 +1,51 @@ //! Python harness emitter. //! -//! Generates a Python script that: -//! 1. Reads the payload from `NYX_PAYLOAD` env var. -//! 2. Installs a `sys.settrace`-based probe at the sink call site -//! (`spec.sink_file:spec.sink_line`) that prints `__NYX_SINK_HIT__`. -//! 3. Imports the entry module and calls the entry function with the -//! payload routed to the correct parameter slot. -//! 4. Catches all exceptions to prevent harness crashes from masking results. +//! Phase 12 (Track B Python vertical) replaces the single legacy +//! `emit` body with dispatch over [`PythonShape`] — the cross product of +//! [`EntryKind`] and a lightweight per-file shape detector that inspects +//! the entry file for framework decorators / CLI gates / async / pytest +//! conventions. Each shape returns its own [`HarnessSource`] but shares +//! the Phase 06 probe shim ([`probe_shim`]) and payload prelude so the +//! sink-reachability oracle works uniformly across shapes. +//! +//! Detection is best-effort: when the entry file is unreadable or no +//! shape matches, the emitter falls back to [`PythonShape::Generic`], +//! which preserves the pre-Phase-12 behaviour (call the entry function +//! positionally with the payload). The dispatch never returns an +//! emitter-side error for an unknown shape — that responsibility belongs +//! to `lang::emit`, which has already gated on +//! [`EntryKind`] via [`PythonEmitter::entry_kinds_supported`]. //! //! Payload slot support: -//! - `PayloadSlot::Param(n)` — n-th positional argument. -//! - `PayloadSlot::EnvVar(name)` — set env var before calling. -//! - Other slots produce `UnsupportedReason::PayloadSlotUnsupported`. +//! - [`PayloadSlot::Param`] — n-th positional argument. +//! - [`PayloadSlot::EnvVar`] — set env var before calling. +//! - [`PayloadSlot::Stdin`] — buffer payload onto `sys.stdin`. +//! - Other slots produce [`UnsupportedReason::PayloadSlotUnsupported`]. use crate::dynamic::environment::{Environment, RuntimeArtifacts}; use crate::dynamic::lang::{HarnessSource, LangEmitter}; use crate::dynamic::spec::{EntryKind, HarnessSpec, PayloadSlot}; use crate::evidence::UnsupportedReason; use crate::utils::project::DetectedFramework; +use std::path::PathBuf; /// Zero-sized [`LangEmitter`] handle for Python. Registered in the /// `lang::dispatch` table; method bodies delegate to the existing free /// functions in this module. pub struct PythonEmitter; -/// Entry kinds the Python emitter currently understands. Extended in Phase 12 -/// (Track B Python vertical) to include `HttpRoute`, `CliSubcommand`, etc. -const SUPPORTED: &[EntryKind] = &[EntryKind::Function]; +/// Entry kinds the Python emitter understands after Phase 12. +/// +/// `HttpRoute` covers Flask / FastAPI / Django views. `CliSubcommand` +/// covers `if __name__ == "__main__":` entries and explicit click / +/// argparse `main()` functions. `Function` covers pytest, async +/// coroutines, Celery tasks, and generic module-level functions +/// (positional + kwargs). +const SUPPORTED: &[EntryKind] = &[ + EntryKind::Function, + EntryKind::HttpRoute, + EntryKind::CliSubcommand, +]; impl LangEmitter for PythonEmitter { fn emit(&self, spec: &HarnessSpec) -> Result { @@ -39,23 +58,163 @@ impl LangEmitter for PythonEmitter { fn entry_kind_hint(&self, attempted: EntryKind) -> String { format!( - "python emitter supports {SUPPORTED:?}; this finding's enclosing context is `EntryKind::{attempted}` — Track B will add framework + CLI shapes in phase 12" + "python emitter supports {SUPPORTED:?}; this finding's enclosing context is `EntryKind::{attempted}` — see Phase 12 shape dispatch" ) } - /// Phase 09 — Track D.2: emit a pinned `requirements.txt` (and a - /// matching `pyproject.toml` stub when `pyproject.toml` is the - /// project's canonical manifest) covering every captured direct dep - /// plus the framework deps inferred from the project manifest. fn materialize_runtime(&self, env: &Environment) -> RuntimeArtifacts { materialize_python(env) } } +// ── Phase 12: shape detector ───────────────────────────────────────────────── + +/// Concrete per-file shape resolved by reading the entry source. +/// +/// One harness template per variant. When the entry file is unreadable +/// or no marker fires the detector defaults to [`PythonShape::Generic`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PythonShape { + /// Flask `@app.route` / blueprint route. Harness uses + /// `app.test_client()` to dispatch a request to the route. + FlaskRoute, + /// FastAPI `@app.get` / `@router.post` / etc. Harness uses + /// `starlette.testclient.TestClient` to drive the route. + FastApiRoute, + /// Django view (function or `View`/`APIView` method). Harness + /// instantiates a `django.test.RequestFactory` and calls the view. + DjangoView, + /// `if __name__ == "__main__":` script entry or top-level `main()`. + /// Harness sets `sys.argv` and re-imports under `__main__` semantics. + CliEntry, + /// `def test_*(...)` pytest function. Harness imports and calls + /// directly — no pytest runner needed because we drive a single test. + PytestFunction, + /// `async def` coroutine. Harness wraps the call in `asyncio.run`. + AsyncCoroutine, + /// `@app.task` / `@celery.task` Celery task. Harness calls the + /// underlying function directly (eager mode) — Celery's broker is + /// not required for in-process invocation. + CeleryTask, + /// Generic module-level function — positional argument by default, + /// keyword-argument fallback when `PayloadSlot::EnvVar` carries the + /// kwarg name. Backwards-compatible with pre-Phase-12 behaviour. + Generic, +} + +impl PythonShape { + /// Detect the shape from `(spec, source)`. `source` is the literal + /// bytes of the entry file (best-effort — if it could not be read, + /// pass an empty string and the function returns [`Self::Generic`]). + /// + /// Framework detection (Flask / FastAPI / Django) wins over the + /// [`EntryKind`] axis: when the source clearly imports one of those + /// frameworks the route shape is selected even if the spec + /// derivation pipeline tagged the entry kind as + /// [`EntryKind::Function`]. This makes the dispatcher robust + /// against the synthetic flow-step path used by tests and against + /// the legacy substring-only entry-kind heuristic. + pub fn detect(spec: &HarnessSpec, source: &str) -> Self { + let entry = spec.entry_name.as_str(); + let kind = spec.entry_kind; + + // ── Framework-first detection ──────────────────────────────── + let has_flask = + source_has_marker(source, &["from flask", "import flask", "Flask("]); + let has_fastapi = source_has_marker( + source, + &["from fastapi", "import fastapi", "FastAPI(", "APIRouter("], + ); + let has_django = source_has_marker( + source, + &[ + "from django", + "import django", + "django.http", + "urlpatterns", + "APIView", + "django.views", + ], + ); + + // FastAPI takes precedence when both fastapi + starlette imports + // show up (FastAPI imports starlette transitively); same for + // Flask vs werkzeug. Django is mutually exclusive in practice. + if has_fastapi { + return Self::FastApiRoute; + } + if has_django { + return Self::DjangoView; + } + if has_flask { + return Self::FlaskRoute; + } + + if kind == EntryKind::HttpRoute { + // The flow-step said HTTP but no framework import was + // detected — fall back to Flask which has the most forgiving + // test client wiring. + return Self::FlaskRoute; + } + + if kind == EntryKind::CliSubcommand + || entry == "main" + || entry == "__main__" + || source.contains("if __name__ == \"__main__\"") + || source.contains("if __name__ == '__main__'") + { + return Self::CliEntry; + } + + if entry.starts_with("test_") && function_is_pytest(source, entry) { + return Self::PytestFunction; + } + + if function_is_celery_task(source, entry) { + return Self::CeleryTask; + } + + if function_is_async(source, entry) { + return Self::AsyncCoroutine; + } + + Self::Generic + } +} + +fn source_has_marker(source: &str, markers: &[&str]) -> bool { + markers.iter().any(|m| source.contains(m)) +} + +fn function_is_pytest(source: &str, name: &str) -> bool { + let needle = format!("def {name}("); + let async_needle = format!("async def {name}("); + (source.contains(&needle) || source.contains(&async_needle)) + && name.starts_with("test_") +} + +fn function_is_async(source: &str, name: &str) -> bool { + source.contains(&format!("async def {name}(")) +} + +fn function_is_celery_task(source: &str, name: &str) -> bool { + let def_needle = format!("def {name}("); + if !source.contains(&def_needle) { + return false; + } + let has_celery_import = source.contains("from celery") || source.contains("import celery"); + let has_task_decorator = source.contains("@app.task") + || source.contains("@celery.task") + || source.contains("@shared_task"); + has_celery_import && has_task_decorator +} + +// ── Probe shim (Phase 06 + Phase 08) ───────────────────────────────────────── + /// Source of the `__nyx_probe` shim for the Python harness. /// -/// The shim is callable as `__nyx_probe("sink.callee", arg0, arg1, ...)`. -/// It emits one JSON line per call to `NYX_PROBE_PATH` (when set) in the +/// Callable as `__nyx_probe("sink.callee", arg0, arg1, ...)`. Emits one +/// JSON line per call to `NYX_PROBE_PATH` (when set) in the /// [`crate::dynamic::probe::SinkProbe`] schema. No-op when the env var /// is unset, so the shim is safe to inject even when the runner has not /// configured a probe channel. @@ -178,23 +337,15 @@ def __nyx_install_crash_guard(sink_callee): "# } -/// Phase 09 - Track D.2: synthesise a `requirements.txt` from the +// ── Runtime / requirements.txt synthesis (Phase 09) ───────────────────────── + +/// Phase 09 — Track D.2: synthesise a `requirements.txt` from the /// captured deps in `env`. -/// -/// The output is a deterministic, alphabetised listing of every -/// non-stdlib direct dep the entry file imported plus the framework deps -/// inferred from the manifest detector. Each entry is emitted as the -/// canonical pip-installable name; version pins are intentionally -/// omitted so the system pip resolves the latest compatible release -/// against the user's pinned Python interpreter (the spec's -/// `toolchain_id` field). A future phase can fold pinned versions in -/// once the capture pass learns to parse the project's own lockfile. pub fn materialize_python(env: &Environment) -> RuntimeArtifacts { let mut artifacts = RuntimeArtifacts::new(); let mut deps: Vec = Vec::new(); let mut seen: std::collections::HashSet = std::collections::HashSet::new(); - // Direct imports first — these mirror the entry file faithfully. for d in &env.direct_deps { if is_python_stdlib(d) { continue; @@ -204,9 +355,6 @@ pub fn materialize_python(env: &Environment) -> RuntimeArtifacts { deps.push(canonical); } } - // Framework deps next — these may not appear as direct imports in - // every entry file, but they have to be installed for the runtime - // to resolve framework decorators. for fw in &env.frameworks { if let Some(name) = python_framework_pkg_name(*fw) { let canonical = canonical_python_pkg_name(name); @@ -232,9 +380,6 @@ pub fn materialize_python(env: &Environment) -> RuntimeArtifacts { artifacts } -/// Returns true when `name` is a Python standard-library top-level -/// package. Conservative: matches the names the harness build path -/// would silently drop from `requirements.txt` anyway. fn is_python_stdlib(name: &str) -> bool { matches!( name, @@ -313,8 +458,6 @@ fn is_python_stdlib(name: &str) -> bool { ) } -/// Canonicalise common Python pkg aliases to their PyPI distribution -/// name (e.g. `cv2` → `opencv-python`). fn canonical_python_pkg_name(name: &str) -> String { let lower = name.to_ascii_lowercase(); match lower.as_str() { @@ -335,35 +478,93 @@ fn python_framework_pkg_name(fw: DetectedFramework) -> Option<&'static str> { } } +// ── Public entry: emit() ───────────────────────────────────────────────────── + /// Emit a Python harness for `spec`. +/// +/// Reads `spec.entry_file` from disk (best-effort), resolves the +/// concrete [`PythonShape`] via [`PythonShape::detect`], and dispatches +/// to the matching per-shape emitter. When the file cannot be read the +/// dispatcher falls back to [`PythonShape::Generic`], preserving the +/// pre-Phase-12 behaviour. pub fn emit(spec: &HarnessSpec) -> Result { - // Validate payload slot. match &spec.payload_slot { - PayloadSlot::Param(_) | PayloadSlot::EnvVar(_) | PayloadSlot::Stdin => {} - _ => return Err(UnsupportedReason::PayloadSlotUnsupported), + PayloadSlot::Param(_) | PayloadSlot::EnvVar(_) | PayloadSlot::Stdin + | PayloadSlot::QueryParam(_) | PayloadSlot::HttpBody | PayloadSlot::Argv(_) => {} } - let source = generate_source(spec); + let entry_source = read_entry_source(&spec.entry_file); + let shape = PythonShape::detect(spec, &entry_source); + let body = generate_for_shape(spec, shape); Ok(HarnessSource { - source, + source: body, filename: "harness.py".to_owned(), command: vec!["python3".to_owned(), "harness.py".to_owned()], - extra_files: vec![], + extra_files: extra_files_for_shape(shape), entry_subpath: None, }) } -fn generate_source(spec: &HarnessSpec) -> String { +/// Public wrapper to detect the shape for a finalised `HarnessSpec`, +/// reading the entry file from disk. Exposed so test helpers can pin a +/// per-fixture shape without round-tripping through [`emit`]. +pub fn detect_shape(spec: &HarnessSpec) -> PythonShape { + let entry_source = read_entry_source(&spec.entry_file); + PythonShape::detect(spec, &entry_source) +} + +fn read_entry_source(entry_file: &str) -> String { + let candidates = [ + PathBuf::from(entry_file), + PathBuf::from(".").join(entry_file), + ]; + for path in &candidates { + if let Ok(s) = std::fs::read_to_string(path) { + return s; + } + } + String::new() +} + +fn extra_files_for_shape(shape: PythonShape) -> Vec<(String, String)> { + match shape { + PythonShape::FlaskRoute => vec![("requirements.txt".to_owned(), "Flask\n".to_owned())], + PythonShape::FastApiRoute => vec![( + "requirements.txt".to_owned(), + "fastapi\nhttpx\n".to_owned(), + )], + PythonShape::DjangoView => vec![("requirements.txt".to_owned(), "Django\n".to_owned())], + PythonShape::CeleryTask => vec![("requirements.txt".to_owned(), "celery\n".to_owned())], + // Generic / CLI / Pytest / Async use the stdlib only. + _ => vec![], + } +} + +fn generate_for_shape(spec: &HarnessSpec, shape: PythonShape) -> String { + let preamble = harness_preamble(spec); + let body = match shape { + PythonShape::Generic => emit_generic(spec), + PythonShape::CliEntry => emit_cli(spec), + PythonShape::PytestFunction => emit_pytest(spec), + PythonShape::AsyncCoroutine => emit_async(spec), + PythonShape::CeleryTask => emit_celery(spec), + PythonShape::FlaskRoute => emit_flask(spec), + PythonShape::FastApiRoute => emit_fastapi(spec), + PythonShape::DjangoView => emit_django(spec), + }; + let postamble = harness_postamble(); + format!("{preamble}\n{body}\n{postamble}") +} + +/// Shared preamble: shebang, imports, probe shim, sink-line tracer, +/// payload loading, and entry-module import. Every shape body assumes +/// `payload`, `_payload_raw`, and `_entry_mod` are in scope. +fn harness_preamble(spec: &HarnessSpec) -> String { let entry_module = module_name(&spec.entry_file); - let entry_fn = &spec.entry_name; let sink_file = &spec.sink_file; let sink_line = spec.sink_line; - - // Build the call expression based on payload slot. - let (pre_call, call_expr) = build_call(spec, entry_module, entry_fn); let probe = probe_shim(); - format!( r#"#!/usr/bin/env python3 """Nyx dynamic harness — auto-generated, do not edit.""" @@ -372,9 +573,6 @@ import sys import traceback # ── Sink-reachability probe (sys.settrace) ──────────────────────────────────── -# Fires __NYX_SINK_HIT__ exactly once when the traced function is called at -# the expected file:line. Filtered to avoid false positives from library code. - {probe} _NYX_SINK_FILE = {sink_file:?} @@ -384,7 +582,6 @@ _NYX_SINK_HIT = False def _nyx_tracer(frame, event, arg): global _NYX_SINK_HIT if not _NYX_SINK_HIT and event == "line": - # Normalise path for comparison (basename match as fallback). fname = frame.f_code.co_filename if fname == _NYX_SINK_FILE or fname.endswith(_NYX_SINK_FILE) or ( os.path.basename(fname) == os.path.basename(_NYX_SINK_FILE) @@ -397,36 +594,41 @@ def _nyx_tracer(frame, event, arg): sys.settrace(_nyx_tracer) # ── Payload loading ──────────────────────────────────────────────────────────── -# Primary: raw bytes from NYX_PAYLOAD; fallback: base64 from NYX_PAYLOAD_B64. - _payload_raw = os.environb.get(b"NYX_PAYLOAD", b"") if not _payload_raw: import base64 _payload_b64 = os.environ.get("NYX_PAYLOAD_B64", "") if _payload_b64: _payload_raw = base64.b64decode(_payload_b64) - -# Decode payload to str (best-effort; use latin-1 as lossless fallback). try: payload = _payload_raw.decode("utf-8") except UnicodeDecodeError: payload = _payload_raw.decode("latin-1") # ── Entry module import ──────────────────────────────────────────────────────── -# The entry file is mounted at the harness workdir as the module. -# sys.path is extended to include the workdir so relative imports work. sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, ".") - try: import {entry_module} as _entry_mod except ImportError as _e: print(f"NYX_IMPORT_ERROR: {{_e}}", file=sys.stderr, flush=True) - sys.exit(77) # Distinct exit code: import failed + sys.exit(77) +"# + ) +} -# ── Pre-call setup ───────────────────────────────────────────────────────────── +fn harness_postamble() -> &'static str { + // Ensure probe fires for line-range matches on late-called sinks. + "sys.settrace(None)\n" +} + +// ── Per-shape bodies ───────────────────────────────────────────────────────── + +fn emit_generic(spec: &HarnessSpec) -> String { + let (pre_call, call_expr) = build_call(spec, &spec.entry_name); + format!( + r#"# Shape: generic module-level function. {pre_call} -# ── Call entry point ────────────────────────────────────────────────────────── try: _result = {call_expr} if _result is not None: @@ -437,46 +639,390 @@ try: except SystemExit as _e: sys.exit(_e.code) except Exception as _e: - # Print error to stderr so the oracle can observe error-based injection. print(f"NYX_EXCEPTION: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) - -# Ensure probe fires for line-range matches on late-called sinks. -sys.settrace(None) -"#, - sink_file = sink_file, - sink_line = sink_line, - entry_module = entry_module, - pre_call = pre_call, - call_expr = call_expr, - probe = probe, +"# ) } +fn emit_cli(spec: &HarnessSpec) -> String { + let entry_module = module_name(&spec.entry_file); + let entry_fn = &spec.entry_name; + let argv_slot = match &spec.payload_slot { + PayloadSlot::Argv(idx) => *idx, + _ => 0, + }; + // Build argv: argv[0] = module name, argv[argv_slot+1] = payload. + format!( + r#"# Shape: CLI entry — drives `if __name__ == "__main__":` semantics. +_argv_payload_slot = {argv_slot} +_new_argv = [{module:?}] +for _i in range(_argv_payload_slot): + _new_argv.append("") +_new_argv.append(payload) +sys.argv = _new_argv +try: + # If module exposes an explicit `{entry_fn}` callable, prefer that. + _entry_callable = getattr(_entry_mod, "{entry_fn}", None) + if callable(_entry_callable): + _result = _entry_callable() + if _result is not None: + print(str(_result), flush=True) + else: + # Fall back to re-importing under `__main__` to fire the + # `if __name__ == "__main__":` block. + import runpy + runpy.run_module({module:?}, run_name="__main__") +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) +"#, + argv_slot = argv_slot, + module = entry_module, + entry_fn = entry_fn, + ) +} + +fn emit_pytest(spec: &HarnessSpec) -> String { + let entry_fn = &spec.entry_name; + // pytest functions usually take no args; the payload is injected via + // env var or by monkeypatching a request-builder. Default to + // env-var injection so the fixture can read `os.environ["PAYLOAD"]`. + let env_name = match &spec.payload_slot { + PayloadSlot::EnvVar(name) => name.clone(), + _ => "NYX_PAYLOAD".to_owned(), + }; + format!( + r#"# Shape: pytest function — drive the single test directly. +os.environ[{env_name:?}] = payload +try: + _result = _entry_mod.{entry_fn}() + if _result is not None: + try: + print(str(_result), flush=True) + except Exception: + pass +except AssertionError as _e: + # AssertionError is the typical pytest failure path; observable. + print(f"NYX_ASSERT: {{_e}}", file=sys.stderr, flush=True) +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) +"# + ) +} + +fn emit_async(spec: &HarnessSpec) -> String { + let entry_fn = &spec.entry_name; + let (pre_call, call_args) = build_call_args(spec); + format!( + r#"# Shape: async coroutine — wrap in asyncio.run. +import asyncio +{pre_call} +try: + _coro = _entry_mod.{entry_fn}({call_args}) + _result = asyncio.run(_coro) + if _result is not None: + try: + print(str(_result), flush=True) + except Exception: + pass +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) +"# + ) +} + +fn emit_celery(spec: &HarnessSpec) -> String { + let entry_fn = &spec.entry_name; + let (pre_call, call_args) = build_call_args(spec); + format!( + r#"# Shape: Celery task — call underlying function directly (eager). +{pre_call} +try: + _task = _entry_mod.{entry_fn} + # Celery tasks expose the underlying function via `.run` (always) and + # `.__wrapped__` (when the decorator preserves it). Prefer the + # underlying callable so we don't go through Celery's broker. + _fn = getattr(_task, "run", None) or getattr(_task, "__wrapped__", None) or _task + _result = _fn({call_args}) + if _result is not None: + try: + print(str(_result), flush=True) + except Exception: + pass +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) +"# + ) +} + +fn emit_flask(spec: &HarnessSpec) -> String { + let entry_fn = &spec.entry_name; + let (method, query_name, body_kind) = resolve_http_payload(&spec.payload_slot); + format!( + r#"# Shape: Flask route — dispatch via app.test_client(). +def _nyx_resolve_flask_app(mod): + from flask import Flask + candidates = [getattr(mod, n, None) for n in ("app", "application", "create_app")] + for c in candidates: + if callable(c) and not isinstance(c, Flask): + try: + got = c() + if isinstance(got, Flask): + return got + except TypeError: + pass + if isinstance(c, Flask): + return c + for attr in dir(mod): + val = getattr(mod, attr, None) + if isinstance(val, Flask): + return val + return None + +_app = _nyx_resolve_flask_app(_entry_mod) +if _app is None: + print("NYX_FLASK_APP_NOT_FOUND", file=sys.stderr, flush=True) + sys.exit(78) + +_route = None +for _r in _app.url_map.iter_rules(): + if _r.endpoint == {entry_fn:?} or _r.endpoint.endswith("." + {entry_fn:?}): + _route = _r + break +if _route is None: + # Fall back: any rule will do, but pick the first POST/GET. + _rules = list(_app.url_map.iter_rules()) + _route = _rules[0] if _rules else None +if _route is None: + print("NYX_FLASK_ROUTE_NOT_FOUND", file=sys.stderr, flush=True) + sys.exit(79) + +_path = _route.rule +# Strip route parameters; replace `` with payload when used as +# the path slot, otherwise with "x". +import re +if {body_kind:?} == "path": + _path = re.sub(r"<[^>]+>", payload, _path, count=1) +else: + _path = re.sub(r"<[^>]+>", "x", _path) + +_client = _app.test_client() +_method = {method:?} +_query = {{}} +_data = None +if {body_kind:?} == "query": + _query[{query_name:?}] = payload +elif {body_kind:?} == "body": + _data = payload +elif {body_kind:?} == "env": + os.environ[{query_name:?}] = payload +try: + _resp = _client.open(_path, method=_method, query_string=_query, data=_data) + try: + print(_resp.get_data(as_text=True), flush=True) + except Exception: + pass +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) +"# + ) +} + +fn emit_fastapi(spec: &HarnessSpec) -> String { + let entry_fn = &spec.entry_name; + let (method, query_name, body_kind) = resolve_http_payload(&spec.payload_slot); + format!( + r#"# Shape: FastAPI route — dispatch via starlette.testclient.TestClient. +def _nyx_resolve_fastapi_app(mod): + try: + from fastapi import FastAPI + except ImportError: + return None + for n in ("app", "application"): + v = getattr(mod, n, None) + if isinstance(v, FastAPI): + return v + for attr in dir(mod): + val = getattr(mod, attr, None) + if isinstance(val, FastAPI): + return val + return None + +_app = _nyx_resolve_fastapi_app(_entry_mod) +if _app is None: + print("NYX_FASTAPI_APP_NOT_FOUND", file=sys.stderr, flush=True) + sys.exit(78) + +try: + from starlette.testclient import TestClient +except ImportError: + print("NYX_FASTAPI_TESTCLIENT_MISSING", file=sys.stderr, flush=True) + sys.exit(79) + +_path = None +for _r in _app.routes: + _name = getattr(_r, "name", None) + _endpoint = getattr(_r, "endpoint", None) + _endpoint_name = getattr(_endpoint, "__name__", None) + if _name == {entry_fn:?} or _endpoint_name == {entry_fn:?}: + _path = getattr(_r, "path", None) + break +if _path is None and _app.routes: + _path = getattr(_app.routes[0], "path", None) +if _path is None: + print("NYX_FASTAPI_ROUTE_NOT_FOUND", file=sys.stderr, flush=True) + sys.exit(80) + +# Strip path parameters; replace `{{param}}` with the payload when used +# as the path slot, otherwise with "x". +import re +if {body_kind:?} == "path": + _path = re.sub(r"\{{[^}}]+\}}", payload, _path, count=1) +else: + _path = re.sub(r"\{{[^}}]+\}}", "x", _path) + +_client = TestClient(_app, raise_server_exceptions=False) +_method = {method:?} +_query = {{}} +_body = None +if {body_kind:?} == "query": + _query[{query_name:?}] = payload +elif {body_kind:?} == "body": + _body = payload +elif {body_kind:?} == "env": + os.environ[{query_name:?}] = payload +try: + _resp = _client.request(_method, _path, params=_query, content=_body) + try: + print(_resp.text, flush=True) + except Exception: + pass +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) +"# + ) +} + +fn emit_django(spec: &HarnessSpec) -> String { + let entry_fn = &spec.entry_name; + let (method, query_name, body_kind) = resolve_http_payload(&spec.payload_slot); + format!( + r#"# Shape: Django view — drive via RequestFactory. +def _nyx_django_setup(): + import django + from django.conf import settings + if not settings.configured: + settings.configure( + DEBUG=False, + DATABASES={{"default": {{"ENGINE": "django.db.backends.sqlite3", "NAME": ":memory:"}}}}, + INSTALLED_APPS=["django.contrib.contenttypes", "django.contrib.auth"], + ROOT_URLCONF=None, + ALLOWED_HOSTS=["*"], + SECRET_KEY="nyx-test-key", + USE_TZ=True, + ) + django.setup() + +_nyx_django_setup() +from django.test import RequestFactory + +_view = getattr(_entry_mod, {entry_fn:?}, None) +if _view is None: + # Try class-based view dispatch: find a class whose lowercased name + # matches {entry_fn:?}, instantiate it, and call as_view(). + for attr in dir(_entry_mod): + val = getattr(_entry_mod, attr, None) + if isinstance(val, type): + try: + _view = val.as_view() + break + except Exception: + pass +if _view is None: + print("NYX_DJANGO_VIEW_NOT_FOUND", file=sys.stderr, flush=True) + sys.exit(78) + +_factory = RequestFactory() +_path = "/" +_method = {method:?} +_query = {{}} +_data = None +if {body_kind:?} == "query": + _query[{query_name:?}] = payload +elif {body_kind:?} == "body": + _data = payload +elif {body_kind:?} == "env": + os.environ[{query_name:?}] = payload +_factory_method = getattr(_factory, _method.lower(), _factory.get) +_request = _factory_method(_path, data=_query or _data, content_type="text/plain" if _data else None) +try: + _resp = _view(_request) + try: + if hasattr(_resp, "render") and not getattr(_resp, "is_rendered", True): + _resp.render() + _content = getattr(_resp, "content", b"") + if isinstance(_content, (bytes, bytearray)): + _content = _content.decode("utf-8", "replace") + print(_content, flush=True) + except Exception: + pass +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) +"# + ) +} + +// ── Slot resolution helpers ────────────────────────────────────────────────── + /// Build `(pre_call_setup, call_expression)` for the chosen payload slot. -fn build_call(spec: &HarnessSpec, _module: &str, func: &str) -> (String, String) { +/// +/// Used by the [`PythonShape::Generic`] body. Other shapes build their +/// call shape inline because their entry contract differs (HTTP request, +/// asyncio coroutine, etc.). +fn build_call(spec: &HarnessSpec, func: &str) -> (String, String) { match &spec.payload_slot { PayloadSlot::Param(idx) => { - // Build positional args: put payload at index `idx`, fill others with "". - // For simplicity with unknown arities, pass payload as the first arg. let pre = String::new(); let call = if *idx == 0 { format!("_entry_mod.{func}(payload)") } else { - // Pad with empty strings up to idx, then payload. let pads = (0..*idx).map(|_| "\"\"").collect::>().join(", "); format!("_entry_mod.{func}({pads}, payload)") }; (pre, call) } PayloadSlot::EnvVar(name) => { - let pre = format!("os.environ[{name:?}] = payload\n"); - let call = format!("_entry_mod.{func}()"); - (pre, call) + // EnvVar can carry either a real env var (set before call, + // call takes no args) or a kwarg name (passed as kwarg). + // Heuristic: identifiers starting with lowercase that look + // like Python identifiers are kwargs; everything else is an + // env var. + if name.chars().next().map(|c| c.is_ascii_lowercase()).unwrap_or(false) { + let pre = String::new(); + let call = format!("_entry_mod.{func}({name}=payload)"); + (pre, call) + } else { + let pre = format!("os.environ[{name:?}] = payload\n"); + let call = format!("_entry_mod.{func}()"); + (pre, call) + } } PayloadSlot::Stdin => { - let pre = format!( - "import io\nsys.stdin = io.TextIOWrapper(io.BytesIO(_payload_raw))\n" - ); + let pre = "import io\nsys.stdin = io.TextIOWrapper(io.BytesIO(_payload_raw))\n" + .to_owned(); let call = format!("_entry_mod.{func}()"); (pre, call) } @@ -488,9 +1034,53 @@ fn build_call(spec: &HarnessSpec, _module: &str, func: &str) -> (String, String) } } +/// Variant of [`build_call`] that returns the bare argument list (no +/// `_entry_mod.` wrapper) so async / celery shapes can splice +/// custom call wrappers. +fn build_call_args(spec: &HarnessSpec) -> (String, String) { + match &spec.payload_slot { + PayloadSlot::Param(idx) => { + let pre = String::new(); + let args = if *idx == 0 { + "payload".to_owned() + } else { + let pads = (0..*idx).map(|_| "\"\"").collect::>().join(", "); + format!("{pads}, payload") + }; + (pre, args) + } + PayloadSlot::EnvVar(name) => { + if name.chars().next().map(|c| c.is_ascii_lowercase()).unwrap_or(false) { + (String::new(), format!("{name}=payload")) + } else { + let pre = format!("os.environ[{name:?}] = payload\n"); + (pre, String::new()) + } + } + PayloadSlot::Stdin => { + let pre = "import io\nsys.stdin = io.TextIOWrapper(io.BytesIO(_payload_raw))\n" + .to_owned(); + (pre, String::new()) + } + _ => (String::new(), "payload".to_owned()), + } +} + +/// Resolve `(http_method, query_or_env_name, body_kind)` from the +/// payload slot. `body_kind` is one of "query", "body", "env", +/// "path" — driving how the HTTP shapes wire the payload into the +/// request. +fn resolve_http_payload(slot: &PayloadSlot) -> (&'static str, String, &'static str) { + match slot { + PayloadSlot::QueryParam(name) => ("GET", name.clone(), "query"), + PayloadSlot::HttpBody => ("POST", String::new(), "body"), + PayloadSlot::EnvVar(name) => ("GET", name.clone(), "env"), + PayloadSlot::Param(_) => ("GET", "x".to_owned(), "path"), + _ => ("GET", "q".to_owned(), "query"), + } +} + /// Convert an entry file path to a Python module name. -/// -/// `"src/handlers/login.py"` → `"login"` (basename without extension). fn module_name(entry_file: &str) -> &str { let base = entry_file .rsplit('/') @@ -534,7 +1124,6 @@ mod tests { let harness = emit(&spec).unwrap(); assert!(harness.source.contains("sys.settrace")); assert!(harness.source.contains("__NYX_SINK_HIT__")); - assert!(harness.source.contains("event == \"line\"")); assert!(harness.source.contains("login(payload)")); assert_eq!(harness.filename, "harness.py"); } @@ -547,10 +1136,18 @@ mod tests { } #[test] - fn emit_env_var_slot() { + fn emit_env_var_slot_uppercase_sets_env() { let spec = make_spec(PayloadSlot::EnvVar("USER_INPUT".into())); let harness = emit(&spec).unwrap(); assert!(harness.source.contains("os.environ[\"USER_INPUT\"] = payload")); + assert!(harness.source.contains("login()")); + } + + #[test] + fn emit_env_var_lowercase_passes_kwarg() { + let spec = make_spec(PayloadSlot::EnvVar("query".into())); + let harness = emit(&spec).unwrap(); + assert!(harness.source.contains("login(query=payload)")); } #[test] @@ -561,41 +1158,166 @@ mod tests { } #[test] - fn entry_kinds_supported_is_non_empty() { - assert!(!PythonEmitter.entry_kinds_supported().is_empty()); - assert!(PythonEmitter - .entry_kinds_supported() - .contains(&EntryKind::Function)); + fn entry_kinds_supported_includes_http_and_cli() { + let kinds = PythonEmitter.entry_kinds_supported(); + assert!(kinds.contains(&EntryKind::Function)); + assert!(kinds.contains(&EntryKind::HttpRoute)); + assert!(kinds.contains(&EntryKind::CliSubcommand)); } #[test] - fn entry_kind_hint_names_attempted_and_phase() { - let hint = PythonEmitter.entry_kind_hint(EntryKind::HttpRoute); - assert!(hint.contains("HttpRoute")); - assert!(hint.contains("phase 12")); + fn entry_kind_hint_names_attempted() { + let hint = PythonEmitter.entry_kind_hint(EntryKind::LibraryApi); + assert!(hint.contains("LibraryApi")); } #[test] fn probe_shim_is_injected() { let spec = make_spec(PayloadSlot::Param(0)); let harness = emit(&spec).unwrap(); - assert!( - harness.source.contains("def __nyx_probe"), - "Phase 06 shim must be present in generated harness", - ); + assert!(harness.source.contains("def __nyx_probe")); assert!(harness.source.contains("NYX_PROBE_PATH")); } #[test] - fn unsupported_lang_returns_err() { - let mut spec = make_spec(PayloadSlot::Param(0)); - spec.lang = Lang::Rust; - // lang::emit handles the dispatch; test the python module directly - // by checking it only handles Python. - // We emit for Python directly here, not for Rust. - let harness = emit(&spec); - // python::emit doesn't check lang - it just generates code. - // The lang dispatch is in lang/mod.rs. - assert!(harness.is_ok()); + fn shape_detect_flask() { + let src = "from flask import Flask\napp = Flask(__name__)\n@app.route('/')\ndef index():\n pass\n"; + let spec = make_spec_with(EntryKind::HttpRoute, "index"); + assert_eq!(PythonShape::detect(&spec, src), PythonShape::FlaskRoute); + } + + #[test] + fn shape_detect_fastapi() { + let src = "from fastapi import FastAPI\napp = FastAPI()\n@app.get('/')\ndef index(): pass\n"; + let spec = make_spec_with(EntryKind::HttpRoute, "index"); + assert_eq!(PythonShape::detect(&spec, src), PythonShape::FastApiRoute); + } + + #[test] + fn shape_detect_django() { + let src = "from django.http import HttpResponse\ndef index(request): pass\n"; + let spec = make_spec_with(EntryKind::HttpRoute, "index"); + assert_eq!(PythonShape::detect(&spec, src), PythonShape::DjangoView); + } + + #[test] + fn shape_detect_cli() { + let src = "def main():\n pass\nif __name__ == \"__main__\":\n main()\n"; + let spec = make_spec_with(EntryKind::CliSubcommand, "main"); + assert_eq!(PythonShape::detect(&spec, src), PythonShape::CliEntry); + } + + #[test] + fn shape_detect_pytest() { + let src = "def test_login(): pass\n"; + let spec = make_spec_with(EntryKind::Function, "test_login"); + assert_eq!(PythonShape::detect(&spec, src), PythonShape::PytestFunction); + } + + #[test] + fn shape_detect_async() { + let src = "async def fetch_url(u): pass\n"; + let spec = make_spec_with(EntryKind::Function, "fetch_url"); + assert_eq!(PythonShape::detect(&spec, src), PythonShape::AsyncCoroutine); + } + + #[test] + fn shape_detect_celery() { + let src = "from celery import Celery\napp = Celery()\n@app.task\ndef run_job(x): pass\n"; + let spec = make_spec_with(EntryKind::Function, "run_job"); + assert_eq!(PythonShape::detect(&spec, src), PythonShape::CeleryTask); + } + + #[test] + fn shape_detect_generic_fallback() { + let src = "def login(name): pass\n"; + let spec = make_spec_with(EntryKind::Function, "login"); + assert_eq!(PythonShape::detect(&spec, src), PythonShape::Generic); + } + + #[test] + fn flask_shape_emits_test_client() { + let spec = make_spec_with(EntryKind::HttpRoute, "index"); + let src = generate_for_shape(&spec, PythonShape::FlaskRoute); + assert!(src.contains("app.test_client()")); + assert!(src.contains("from flask import Flask")); + } + + #[test] + fn fastapi_shape_emits_starlette_testclient() { + let spec = make_spec_with(EntryKind::HttpRoute, "index"); + let src = generate_for_shape(&spec, PythonShape::FastApiRoute); + assert!(src.contains("starlette.testclient")); + assert!(src.contains("TestClient")); + } + + #[test] + fn django_shape_emits_request_factory() { + let spec = make_spec_with(EntryKind::HttpRoute, "index"); + let src = generate_for_shape(&spec, PythonShape::DjangoView); + assert!(src.contains("RequestFactory")); + assert!(src.contains("settings.configure")); + } + + #[test] + fn cli_shape_sets_argv() { + let spec = make_spec_with(EntryKind::CliSubcommand, "main"); + let src = generate_for_shape(&spec, PythonShape::CliEntry); + assert!(src.contains("sys.argv =")); + assert!(src.contains("runpy")); + } + + #[test] + fn pytest_shape_sets_env_and_calls() { + let spec = make_spec_with(EntryKind::Function, "test_login"); + let src = generate_for_shape(&spec, PythonShape::PytestFunction); + assert!(src.contains("test_login()")); + assert!(src.contains("NYX_PAYLOAD")); + } + + #[test] + fn async_shape_wraps_asyncio_run() { + let spec = make_spec_with(EntryKind::Function, "fetch_url"); + let src = generate_for_shape(&spec, PythonShape::AsyncCoroutine); + assert!(src.contains("asyncio.run")); + assert!(src.contains("fetch_url(payload)")); + } + + #[test] + fn celery_shape_unwraps_task() { + let spec = make_spec_with(EntryKind::Function, "run_job"); + let src = generate_for_shape(&spec, PythonShape::CeleryTask); + assert!(src.contains("__wrapped__")); + assert!(src.contains("getattr(_task, \"run\"")); + } + + #[test] + fn http_shapes_pick_up_query_param_slot() { + let mut spec = make_spec_with(EntryKind::HttpRoute, "index"); + spec.payload_slot = PayloadSlot::QueryParam("q".into()); + let src = generate_for_shape(&spec, PythonShape::FlaskRoute); + assert!(src.contains("\"query\"")); + assert!(src.contains("\"q\"")); + } + + #[test] + fn extra_files_flask_pins_flask() { + let extras = extra_files_for_shape(PythonShape::FlaskRoute); + assert!(extras.iter().any(|(p, c)| p == "requirements.txt" && c.contains("Flask"))); + } + + #[test] + fn extra_files_fastapi_pins_httpx() { + let extras = extra_files_for_shape(PythonShape::FastApiRoute); + assert!(extras + .iter() + .any(|(p, c)| p == "requirements.txt" && c.contains("fastapi") && c.contains("httpx"))); + } + + fn make_spec_with(kind: EntryKind, name: &str) -> HarnessSpec { + let mut s = make_spec(PayloadSlot::Param(0)); + s.entry_kind = kind; + s.entry_name = name.to_owned(); + s } } diff --git a/tests/common/fixture_harness.rs b/tests/common/fixture_harness.rs index 97370914..f02c81a2 100644 --- a/tests/common/fixture_harness.rs +++ b/tests/common/fixture_harness.rs @@ -16,8 +16,8 @@ use nyx_scanner::commands::scan::Diag; use nyx_scanner::dynamic::verify::{verify_finding, VerifyOptions}; use nyx_scanner::evidence::{ - Confidence, Evidence, FlowStep, FlowStepKind, InconclusiveReason, UnsupportedReason, - VerifyResult, VerifyStatus, + Confidence, EntryKind, Evidence, FlowStep, FlowStepKind, InconclusiveReason, + UnsupportedReason, VerifyResult, VerifyStatus, }; use nyx_scanner::labels::Cap; use nyx_scanner::patterns::{FindingCategory, Severity}; @@ -192,6 +192,238 @@ fn stage_fixture(src: &Path, tmp: &TempDir, copy: CopyStrategy) -> PathBuf { } } +/// Phase 12 — per-shape acceptance helper. +/// +/// Stages `fixture_root//` into a tempdir, builds a +/// [`HarnessSpec`] with the caller's `entry_kind` / `payload_slot`, +/// then executes it through [`nyx_scanner::dynamic::runner::run_spec`] +/// directly. Returns a [`VerifyResult`]-shaped summary so callers can +/// reuse the same `assert_confirmed` / `assert_not_confirmed` helpers +/// the older golden-based suite uses. +/// +/// Bypasses [`verify_finding`] because the public verifier derives the +/// payload slot from the synthetic Diag's flow steps and always lands +/// on [`nyx_scanner::dynamic::spec::PayloadSlot::Param`], which the +/// HTTP / pytest / CLI shapes cannot honour. Going through the runner +/// directly lets the test pin the slot the spec under test actually +/// expects (e.g. [`nyx_scanner::dynamic::spec::PayloadSlot::QueryParam`] +/// for HTTP routes). +#[allow(clippy::too_many_arguments)] +pub fn run_shape_fixture( + shape_dir: &str, + file: &str, + func: &str, + cap: Cap, + sink_line: u32, + entry_kind: EntryKind, + payload_slot: nyx_scanner::dynamic::spec::PayloadSlot, +) -> VerifyResult { + use nyx_scanner::dynamic::runner::{run_spec, RunError}; + use nyx_scanner::dynamic::sandbox::SandboxOptions; + use nyx_scanner::dynamic::spec::{HarnessSpec, SpecDerivationStrategy}; + use nyx_scanner::symbol::Lang; + + let _guard = FIXTURE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); + + let fixture_root = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests/dynamic_fixtures/python") + .join(shape_dir); + let fixture_src = fixture_root.join(file); + + let tmp = TempDir::new().expect("create tempdir"); + let dst = tmp.path().join(file); + std::fs::copy(&fixture_src, &dst).expect("copy fixture into tempdir"); + + // SAFETY: env mutation is serialised by FIXTURE_LOCK and cleared at end. + unsafe { + std::env::set_var("NYX_REPRO_BASE", tmp.path().join("repro").to_str().unwrap()); + std::env::set_var( + "NYX_TELEMETRY_PATH", + tmp.path().join("events.jsonl").to_str().unwrap(), + ); + } + + let entry_file = dst.to_string_lossy().into_owned(); + // Per-fixture stable hash so workdir layout / cache key stays + // distinct between shapes and between vuln / benign fixtures. + let mut digest = blake3::Hasher::new(); + digest.update(shape_dir.as_bytes()); + digest.update(b"|"); + digest.update(file.as_bytes()); + let spec_hash = format!("{:016x}", { + let bytes = digest.finalize(); + u64::from_le_bytes(bytes.as_bytes()[..8].try_into().unwrap()) + }); + + let spec = HarnessSpec { + finding_id: spec_hash.clone(), + entry_file: entry_file.clone(), + entry_name: func.to_owned(), + entry_kind, + lang: Lang::Python, + toolchain_id: "python-3".into(), + payload_slot, + expected_cap: cap, + constraint_hints: vec![], + sink_file: entry_file, + sink_line, + spec_hash, + derivation: SpecDerivationStrategy::FromFlowSteps, + stubs_required: vec![], + }; + + let opts = SandboxOptions::default(); + let outcome = run_spec(&spec, &opts); + + unsafe { + std::env::remove_var("NYX_REPRO_BASE"); + std::env::remove_var("NYX_TELEMETRY_PATH"); + } + + // Project the [`RunOutcome`] / [`RunError`] back onto a + // [`VerifyResult`] shape so callers can assert against + // [`VerifyStatus`] directly without learning the runner's API. + match outcome { + Ok(run) => { + let status = if run.triggered_by.is_some() { + VerifyStatus::Confirmed + } else if run.oracle_collision { + VerifyStatus::Inconclusive + } else { + VerifyStatus::NotConfirmed + }; + VerifyResult { + finding_id: spec.finding_id.clone(), + status, + triggered_payload: run + .triggered_by + .and_then(|i| run.attempts.get(i)) + .map(|a| a.payload_label.to_owned()), + reason: None, + inconclusive_reason: None, + detail: None, + attempts: vec![], + toolchain_match: None, + differential: None, + } + } + Err(RunError::NoPayloadsForCap) => VerifyResult { + finding_id: spec.finding_id.clone(), + status: VerifyStatus::Unsupported, + triggered_payload: None, + reason: Some(UnsupportedReason::NoPayloadsForCap), + inconclusive_reason: None, + detail: None, + attempts: vec![], + toolchain_match: None, + differential: None, + }, + Err(e) => VerifyResult { + finding_id: spec.finding_id.clone(), + status: VerifyStatus::Inconclusive, + triggered_payload: None, + reason: None, + inconclusive_reason: None, + detail: Some(format!("{e:?}")), + attempts: vec![], + toolchain_match: None, + differential: None, + }, + } +} + +/// Phase 12 — golden harness snapshot. +/// +/// Stages `/` into a tempdir, builds a [`HarnessSpec`] for +/// the supplied entry kind / payload slot, emits the per-shape harness +/// via [`nyx_scanner::dynamic::lang::emit`], and either writes the +/// resulting source to `/.golden_harness.py` (under +/// `NYX_UPDATE_GOLDENS=1`) or diffs against the existing snapshot. The +/// emitter is deterministic, so the snapshot doubles as documentation +/// of the per-shape harness shape. +#[allow(clippy::too_many_arguments)] +pub fn run_harness_snapshot( + shape_dir: &str, + file: &str, + func: &str, + cap: Cap, + sink_line: u32, + entry_kind: EntryKind, + payload_slot: nyx_scanner::dynamic::spec::PayloadSlot, +) { + use nyx_scanner::dynamic::lang; + use nyx_scanner::dynamic::spec::{HarnessSpec, SpecDerivationStrategy}; + use nyx_scanner::symbol::Lang; + + let _guard = FIXTURE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); + + let fixture_root = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests/dynamic_fixtures/python") + .join(shape_dir); + let fixture_src = fixture_root.join(file); + let snapshot_path = fixture_root.join(format!("{file}.golden_harness.py")); + + // Stage into tempdir so the spec.entry_file path matches what the + // verifier sees at runtime. + let tmp = TempDir::new().expect("create tempdir"); + let dst = tmp.path().join(file); + std::fs::copy(&fixture_src, &dst).expect("copy fixture into tempdir"); + let entry_file = dst.to_string_lossy().into_owned(); + + let spec = HarnessSpec { + finding_id: "0000000000000001".into(), + entry_file: entry_file.clone(), + entry_name: func.to_owned(), + entry_kind, + lang: Lang::Python, + toolchain_id: "python-3".into(), + payload_slot, + expected_cap: cap, + constraint_hints: vec![], + sink_file: entry_file, + sink_line, + // Snapshot uses a fixed spec_hash so the emitted source stays + // stable; the runner regenerates the real hash at verify time. + spec_hash: "snapshotsnapshot".into(), + derivation: SpecDerivationStrategy::FromFlowSteps, + stubs_required: vec![], + }; + + let harness = lang::emit(&spec).expect("python emitter must produce a harness"); + + // Strip the tempdir prefix so the snapshot is stable across runs. + let tmp_prefix = tmp.path().to_string_lossy().into_owned(); + let normalised = harness + .source + .replace(&tmp_prefix, "") + .replace(file, ""); + + if std::env::var("NYX_UPDATE_GOLDENS").is_ok_and(|v| v == "1") { + std::fs::write(&snapshot_path, &normalised).unwrap_or_else(|e| { + panic!("write harness snapshot {}: {e}", snapshot_path.display()) + }); + return; + } + + let expected = std::fs::read_to_string(&snapshot_path).unwrap_or_else(|e| { + panic!( + "missing harness snapshot {}: {e}\n\ + current harness source:\n{normalised}\n\ + rerun with NYX_UPDATE_GOLDENS=1 to seed it.", + snapshot_path.display() + ) + }); + + if expected != normalised { + panic!( + "harness snapshot drift for {shape_dir}/{file}:\n\ + ---- expected ----\n{expected}\n\ + ---- actual ----\n{normalised}\n\ + rerun with NYX_UPDATE_GOLDENS=1 if intended." + ); + } +} + fn make_diag(path: &Path, func: &str, cap: Cap, sink_line: u32) -> Diag { let path_str = path.to_string_lossy().into_owned(); let evidence = Evidence { diff --git a/tests/dynamic_fixtures/python/async/benign.py b/tests/dynamic_fixtures/python/async/benign.py new file mode 100644 index 00000000..028f6759 --- /dev/null +++ b/tests/dynamic_fixtures/python/async/benign.py @@ -0,0 +1,22 @@ +"""Phase 12 — async coroutine, benign.""" +import asyncio +import re +import subprocess + +_VALID_HOST = re.compile(r"^[A-Za-z0-9.-]{1,253}$") + + +async def run_ping(host): + await asyncio.sleep(0) + if not _VALID_HOST.fullmatch(host or ""): + print("invalid host") + return + result = subprocess.run( + ["ping", "-c", "1", host], + shell=False, + capture_output=True, + text=True, + timeout=5, + ) + print(result.stdout) + print(result.stderr, end="") diff --git a/tests/dynamic_fixtures/python/async/vuln.py b/tests/dynamic_fixtures/python/async/vuln.py new file mode 100644 index 00000000..0f226aa7 --- /dev/null +++ b/tests/dynamic_fixtures/python/async/vuln.py @@ -0,0 +1,21 @@ +"""Phase 12 — async coroutine, vulnerable. + +`async def` coroutine that shells out with concatenated user input. +Nyx harness wraps the call in `asyncio.run`. +""" +import asyncio +import subprocess + + +async def run_ping(host): + """Vulnerable async coroutine.""" + await asyncio.sleep(0) + result = subprocess.run( + "ping -c 1 " + host, + shell=True, + capture_output=True, + text=True, + timeout=5, + ) + print(result.stdout) + print(result.stderr, end="") diff --git a/tests/dynamic_fixtures/python/async/vuln.py.golden_harness.py b/tests/dynamic_fixtures/python/async/vuln.py.golden_harness.py new file mode 100644 index 00000000..8db32082 --- /dev/null +++ b/tests/dynamic_fixtures/python/async/vuln.py.golden_harness.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 +"""Nyx dynamic harness — auto-generated, do not edit.""" +import os +import sys +import traceback + +# ── Sink-reachability probe (sys.settrace) ──────────────────────────────────── + +# ── __nyx_probe shim (Phase 06 — Track C.1, Phase 08 — Track C.4 + C.5) ────── +# Deny-substring list mirrors crate::dynamic::policy::DENY_KEY_SUBSTRINGS; keep +# in sync when the host-side policy gains new entries. +_NYX_DENY_SUBSTRINGS = ( + "TOKEN", "SECRET", "PASSWORD", "PASSWD", "API_KEY", "APIKEY", + "PRIVATE_KEY", "CREDENTIAL", "SESSION", "COOKIE", "AUTH", "BEARER", + "AWS_ACCESS", "AWS_SESSION", "GH_TOKEN", "GITHUB_TOKEN", "NPM_TOKEN", + "PYPI_TOKEN", "DOCKER_PASS", +) +_NYX_PAYLOAD_LIMIT = 16 * 1024 +_NYX_REDACTED = "" + +def __nyx_scrub_env(): + import os + out = {} + for k, v in os.environ.items(): + ku = str(k).upper() + if any(n in ku for n in _NYX_DENY_SUBSTRINGS): + out[k] = _NYX_REDACTED + else: + out[k] = v + return out + +def __nyx_witness(sink_callee, args): + import os + payload = os.environ.get("NYX_PAYLOAD", "") + payload_bytes = payload.encode("utf-8", "replace") if isinstance(payload, str) else bytes(payload) + if len(payload_bytes) > _NYX_PAYLOAD_LIMIT: + payload_bytes = payload_bytes[:_NYX_PAYLOAD_LIMIT] + args_repr = [] + for a in args: + if isinstance(a, (bytes, bytearray)): + args_repr.append("" % len(a)) + else: + args_repr.append(str(a)) + try: + cwd = os.getcwd() + except OSError: + cwd = "" + return { + "env_snapshot": __nyx_scrub_env(), + "cwd": cwd, + "payload_bytes": list(payload_bytes), + "callee": str(sink_callee), + "args_repr": args_repr, + } + +def __nyx_emit(rec): + import os, json + p = os.environ.get("NYX_PROBE_PATH") + if not p: + return + try: + with open(p, "a") as _f: + _f.write(json.dumps(rec) + "\n") + except OSError: + pass + +def __nyx_probe(sink_callee, *args): + import os, time + serialised = [] + for a in args: + if isinstance(a, (bytes, bytearray)): + serialised.append({"kind": "Bytes", "value": list(a)}) + elif isinstance(a, bool): + serialised.append({"kind": "Int", "value": 1 if a else 0}) + elif isinstance(a, int): + serialised.append({"kind": "Int", "value": a}) + else: + serialised.append({"kind": "String", "value": str(a)}) + rec = { + "sink_callee": str(sink_callee), + "args": serialised, + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {"kind": "Normal"}, + "witness": __nyx_witness(sink_callee, args), + } + __nyx_emit(rec) + +# Phase 08: sink-site signal handler. Call __nyx_install_crash_guard before +# invoking the instrumented sink so a SIGSEGV / SIGABRT / etc. is captured as +# a Crash probe (with witness) before the process aborts. The shim re-raises +# the signal on the default handler after writing so process-level outcome +# observers (exit_code) still see the death. +_NYX_SIGNAL_NAMES = {} + +def __nyx_install_crash_guard(sink_callee): + import signal, os, time + catchable = [] + for nm in ("SIGSEGV", "SIGABRT", "SIGBUS", "SIGFPE", "SIGILL"): + s = getattr(signal, nm, None) + if s is not None: + catchable.append((nm, s)) + _NYX_SIGNAL_NAMES[s] = nm + def _handler(signum, frame): + nm = _NYX_SIGNAL_NAMES.get(signum, "SIG?") + rec = { + "sink_callee": str(sink_callee), + "args": [], + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {"kind": "Crash", "signal": nm}, + "witness": __nyx_witness(sink_callee, []), + } + __nyx_emit(rec) + # Reset to default and re-raise so the process actually dies. + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) + for _nm, s in catchable: + try: + signal.signal(s, _handler) + except (OSError, ValueError): + pass + + +_NYX_SINK_FILE = "/" +_NYX_SINK_LINE = 13 +_NYX_SINK_HIT = False + +def _nyx_tracer(frame, event, arg): + global _NYX_SINK_HIT + if not _NYX_SINK_HIT and event == "line": + fname = frame.f_code.co_filename + if fname == _NYX_SINK_FILE or fname.endswith(_NYX_SINK_FILE) or ( + os.path.basename(fname) == os.path.basename(_NYX_SINK_FILE) + ): + if _NYX_SINK_LINE <= frame.f_lineno <= _NYX_SINK_LINE + 5: + _NYX_SINK_HIT = True + print("__NYX_SINK_HIT__", flush=True) + return _nyx_tracer + +sys.settrace(_nyx_tracer) + +# ── Payload loading ──────────────────────────────────────────────────────────── +_payload_raw = os.environb.get(b"NYX_PAYLOAD", b"") +if not _payload_raw: + import base64 + _payload_b64 = os.environ.get("NYX_PAYLOAD_B64", "") + if _payload_b64: + _payload_raw = base64.b64decode(_payload_b64) +try: + payload = _payload_raw.decode("utf-8") +except UnicodeDecodeError: + payload = _payload_raw.decode("latin-1") + +# ── Entry module import ──────────────────────────────────────────────────────── +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, ".") +try: + import vuln as _entry_mod +except ImportError as _e: + print(f"NYX_IMPORT_ERROR: {_e}", file=sys.stderr, flush=True) + sys.exit(77) + +# Shape: async coroutine — wrap in asyncio.run. +import asyncio + +try: + _coro = _entry_mod.run_ping(payload) + _result = asyncio.run(_coro) + if _result is not None: + try: + print(str(_result), flush=True) + except Exception: + pass +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {type(_e).__name__}: {_e}", file=sys.stderr, flush=True) + +sys.settrace(None) diff --git a/tests/dynamic_fixtures/python/celery/benign.py b/tests/dynamic_fixtures/python/celery/benign.py new file mode 100644 index 00000000..df23f985 --- /dev/null +++ b/tests/dynamic_fixtures/python/celery/benign.py @@ -0,0 +1,25 @@ +"""Phase 12 — Celery task, benign.""" +import re +import subprocess + +from celery import Celery + +app = Celery("nyx_fixture") + +_VALID_HOST = re.compile(r"^[A-Za-z0-9.-]{1,253}$") + + +@app.task +def run_job(host): + if not _VALID_HOST.fullmatch(host or ""): + print("invalid host") + return + result = subprocess.run( + ["ping", "-c", "1", host], + shell=False, + capture_output=True, + text=True, + timeout=5, + ) + print(result.stdout) + print(result.stderr, end="") diff --git a/tests/dynamic_fixtures/python/celery/vuln.py b/tests/dynamic_fixtures/python/celery/vuln.py new file mode 100644 index 00000000..c098fbfb --- /dev/null +++ b/tests/dynamic_fixtures/python/celery/vuln.py @@ -0,0 +1,25 @@ +"""Phase 12 — Celery task, vulnerable. + +Celery's `@app.task` decorator wraps the underlying function on a Task +object. Nyx harness reaches the inner callable via `.run` / +`.__wrapped__` so no broker is required. +""" +import subprocess + +from celery import Celery + +app = Celery("nyx_fixture") + + +@app.task +def run_job(host): + """Vulnerable Celery task body.""" + result = subprocess.run( + "ping -c 1 " + host, + shell=True, + capture_output=True, + text=True, + timeout=5, + ) + print(result.stdout) + print(result.stderr, end="") diff --git a/tests/dynamic_fixtures/python/celery/vuln.py.golden_harness.py b/tests/dynamic_fixtures/python/celery/vuln.py.golden_harness.py new file mode 100644 index 00000000..b51c4d56 --- /dev/null +++ b/tests/dynamic_fixtures/python/celery/vuln.py.golden_harness.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 +"""Nyx dynamic harness — auto-generated, do not edit.""" +import os +import sys +import traceback + +# ── Sink-reachability probe (sys.settrace) ──────────────────────────────────── + +# ── __nyx_probe shim (Phase 06 — Track C.1, Phase 08 — Track C.4 + C.5) ────── +# Deny-substring list mirrors crate::dynamic::policy::DENY_KEY_SUBSTRINGS; keep +# in sync when the host-side policy gains new entries. +_NYX_DENY_SUBSTRINGS = ( + "TOKEN", "SECRET", "PASSWORD", "PASSWD", "API_KEY", "APIKEY", + "PRIVATE_KEY", "CREDENTIAL", "SESSION", "COOKIE", "AUTH", "BEARER", + "AWS_ACCESS", "AWS_SESSION", "GH_TOKEN", "GITHUB_TOKEN", "NPM_TOKEN", + "PYPI_TOKEN", "DOCKER_PASS", +) +_NYX_PAYLOAD_LIMIT = 16 * 1024 +_NYX_REDACTED = "" + +def __nyx_scrub_env(): + import os + out = {} + for k, v in os.environ.items(): + ku = str(k).upper() + if any(n in ku for n in _NYX_DENY_SUBSTRINGS): + out[k] = _NYX_REDACTED + else: + out[k] = v + return out + +def __nyx_witness(sink_callee, args): + import os + payload = os.environ.get("NYX_PAYLOAD", "") + payload_bytes = payload.encode("utf-8", "replace") if isinstance(payload, str) else bytes(payload) + if len(payload_bytes) > _NYX_PAYLOAD_LIMIT: + payload_bytes = payload_bytes[:_NYX_PAYLOAD_LIMIT] + args_repr = [] + for a in args: + if isinstance(a, (bytes, bytearray)): + args_repr.append("" % len(a)) + else: + args_repr.append(str(a)) + try: + cwd = os.getcwd() + except OSError: + cwd = "" + return { + "env_snapshot": __nyx_scrub_env(), + "cwd": cwd, + "payload_bytes": list(payload_bytes), + "callee": str(sink_callee), + "args_repr": args_repr, + } + +def __nyx_emit(rec): + import os, json + p = os.environ.get("NYX_PROBE_PATH") + if not p: + return + try: + with open(p, "a") as _f: + _f.write(json.dumps(rec) + "\n") + except OSError: + pass + +def __nyx_probe(sink_callee, *args): + import os, time + serialised = [] + for a in args: + if isinstance(a, (bytes, bytearray)): + serialised.append({"kind": "Bytes", "value": list(a)}) + elif isinstance(a, bool): + serialised.append({"kind": "Int", "value": 1 if a else 0}) + elif isinstance(a, int): + serialised.append({"kind": "Int", "value": a}) + else: + serialised.append({"kind": "String", "value": str(a)}) + rec = { + "sink_callee": str(sink_callee), + "args": serialised, + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {"kind": "Normal"}, + "witness": __nyx_witness(sink_callee, args), + } + __nyx_emit(rec) + +# Phase 08: sink-site signal handler. Call __nyx_install_crash_guard before +# invoking the instrumented sink so a SIGSEGV / SIGABRT / etc. is captured as +# a Crash probe (with witness) before the process aborts. The shim re-raises +# the signal on the default handler after writing so process-level outcome +# observers (exit_code) still see the death. +_NYX_SIGNAL_NAMES = {} + +def __nyx_install_crash_guard(sink_callee): + import signal, os, time + catchable = [] + for nm in ("SIGSEGV", "SIGABRT", "SIGBUS", "SIGFPE", "SIGILL"): + s = getattr(signal, nm, None) + if s is not None: + catchable.append((nm, s)) + _NYX_SIGNAL_NAMES[s] = nm + def _handler(signum, frame): + nm = _NYX_SIGNAL_NAMES.get(signum, "SIG?") + rec = { + "sink_callee": str(sink_callee), + "args": [], + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {"kind": "Crash", "signal": nm}, + "witness": __nyx_witness(sink_callee, []), + } + __nyx_emit(rec) + # Reset to default and re-raise so the process actually dies. + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) + for _nm, s in catchable: + try: + signal.signal(s, _handler) + except (OSError, ValueError): + pass + + +_NYX_SINK_FILE = "/" +_NYX_SINK_LINE = 17 +_NYX_SINK_HIT = False + +def _nyx_tracer(frame, event, arg): + global _NYX_SINK_HIT + if not _NYX_SINK_HIT and event == "line": + fname = frame.f_code.co_filename + if fname == _NYX_SINK_FILE or fname.endswith(_NYX_SINK_FILE) or ( + os.path.basename(fname) == os.path.basename(_NYX_SINK_FILE) + ): + if _NYX_SINK_LINE <= frame.f_lineno <= _NYX_SINK_LINE + 5: + _NYX_SINK_HIT = True + print("__NYX_SINK_HIT__", flush=True) + return _nyx_tracer + +sys.settrace(_nyx_tracer) + +# ── Payload loading ──────────────────────────────────────────────────────────── +_payload_raw = os.environb.get(b"NYX_PAYLOAD", b"") +if not _payload_raw: + import base64 + _payload_b64 = os.environ.get("NYX_PAYLOAD_B64", "") + if _payload_b64: + _payload_raw = base64.b64decode(_payload_b64) +try: + payload = _payload_raw.decode("utf-8") +except UnicodeDecodeError: + payload = _payload_raw.decode("latin-1") + +# ── Entry module import ──────────────────────────────────────────────────────── +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, ".") +try: + import vuln as _entry_mod +except ImportError as _e: + print(f"NYX_IMPORT_ERROR: {_e}", file=sys.stderr, flush=True) + sys.exit(77) + +# Shape: Celery task — call underlying function directly (eager). + +try: + _task = _entry_mod.run_job + # Celery tasks expose the underlying function via `.run` (always) and + # `.__wrapped__` (when the decorator preserves it). Prefer the + # underlying callable so we don't go through Celery's broker. + _fn = getattr(_task, "run", None) or getattr(_task, "__wrapped__", None) or _task + _result = _fn(payload) + if _result is not None: + try: + print(str(_result), flush=True) + except Exception: + pass +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {type(_e).__name__}: {_e}", file=sys.stderr, flush=True) + +sys.settrace(None) diff --git a/tests/dynamic_fixtures/python/cli/benign.py b/tests/dynamic_fixtures/python/cli/benign.py new file mode 100644 index 00000000..a74a5342 --- /dev/null +++ b/tests/dynamic_fixtures/python/cli/benign.py @@ -0,0 +1,26 @@ +"""Phase 12 — CLI shape, benign.""" +import re +import subprocess +import sys + +_VALID_HOST = re.compile(r"^[A-Za-z0-9.-]{1,253}$") + + +def main(): + host = sys.argv[1] if len(sys.argv) > 1 else "" + if not _VALID_HOST.fullmatch(host): + print("invalid host") + return + result = subprocess.run( + ["ping", "-c", "1", host], + shell=False, + capture_output=True, + text=True, + timeout=5, + ) + print(result.stdout) + print(result.stderr, end="") + + +if __name__ == "__main__": + main() diff --git a/tests/dynamic_fixtures/python/cli/vuln.py b/tests/dynamic_fixtures/python/cli/vuln.py new file mode 100644 index 00000000..433ee61b --- /dev/null +++ b/tests/dynamic_fixtures/python/cli/vuln.py @@ -0,0 +1,26 @@ +"""Phase 12 — CLI shape, vulnerable. + +Driven via `if __name__ == "__main__":` — Nyx harness sets +`sys.argv[1]` to the payload and either calls `main()` or +`runpy.run_module(..., run_name="__main__")` to fire the guard block. +""" +import subprocess +import sys + + +def main(): + """Vulnerable: read host from argv[1] and shell out.""" + host = sys.argv[1] if len(sys.argv) > 1 else "" + result = subprocess.run( + "ping -c 1 " + host, + shell=True, + capture_output=True, + text=True, + timeout=5, + ) + print(result.stdout) + print(result.stderr, end="") + + +if __name__ == "__main__": + main() diff --git a/tests/dynamic_fixtures/python/cli/vuln.py.golden_harness.py b/tests/dynamic_fixtures/python/cli/vuln.py.golden_harness.py new file mode 100644 index 00000000..df3fe3fc --- /dev/null +++ b/tests/dynamic_fixtures/python/cli/vuln.py.golden_harness.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python3 +"""Nyx dynamic harness — auto-generated, do not edit.""" +import os +import sys +import traceback + +# ── Sink-reachability probe (sys.settrace) ──────────────────────────────────── + +# ── __nyx_probe shim (Phase 06 — Track C.1, Phase 08 — Track C.4 + C.5) ────── +# Deny-substring list mirrors crate::dynamic::policy::DENY_KEY_SUBSTRINGS; keep +# in sync when the host-side policy gains new entries. +_NYX_DENY_SUBSTRINGS = ( + "TOKEN", "SECRET", "PASSWORD", "PASSWD", "API_KEY", "APIKEY", + "PRIVATE_KEY", "CREDENTIAL", "SESSION", "COOKIE", "AUTH", "BEARER", + "AWS_ACCESS", "AWS_SESSION", "GH_TOKEN", "GITHUB_TOKEN", "NPM_TOKEN", + "PYPI_TOKEN", "DOCKER_PASS", +) +_NYX_PAYLOAD_LIMIT = 16 * 1024 +_NYX_REDACTED = "" + +def __nyx_scrub_env(): + import os + out = {} + for k, v in os.environ.items(): + ku = str(k).upper() + if any(n in ku for n in _NYX_DENY_SUBSTRINGS): + out[k] = _NYX_REDACTED + else: + out[k] = v + return out + +def __nyx_witness(sink_callee, args): + import os + payload = os.environ.get("NYX_PAYLOAD", "") + payload_bytes = payload.encode("utf-8", "replace") if isinstance(payload, str) else bytes(payload) + if len(payload_bytes) > _NYX_PAYLOAD_LIMIT: + payload_bytes = payload_bytes[:_NYX_PAYLOAD_LIMIT] + args_repr = [] + for a in args: + if isinstance(a, (bytes, bytearray)): + args_repr.append("" % len(a)) + else: + args_repr.append(str(a)) + try: + cwd = os.getcwd() + except OSError: + cwd = "" + return { + "env_snapshot": __nyx_scrub_env(), + "cwd": cwd, + "payload_bytes": list(payload_bytes), + "callee": str(sink_callee), + "args_repr": args_repr, + } + +def __nyx_emit(rec): + import os, json + p = os.environ.get("NYX_PROBE_PATH") + if not p: + return + try: + with open(p, "a") as _f: + _f.write(json.dumps(rec) + "\n") + except OSError: + pass + +def __nyx_probe(sink_callee, *args): + import os, time + serialised = [] + for a in args: + if isinstance(a, (bytes, bytearray)): + serialised.append({"kind": "Bytes", "value": list(a)}) + elif isinstance(a, bool): + serialised.append({"kind": "Int", "value": 1 if a else 0}) + elif isinstance(a, int): + serialised.append({"kind": "Int", "value": a}) + else: + serialised.append({"kind": "String", "value": str(a)}) + rec = { + "sink_callee": str(sink_callee), + "args": serialised, + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {"kind": "Normal"}, + "witness": __nyx_witness(sink_callee, args), + } + __nyx_emit(rec) + +# Phase 08: sink-site signal handler. Call __nyx_install_crash_guard before +# invoking the instrumented sink so a SIGSEGV / SIGABRT / etc. is captured as +# a Crash probe (with witness) before the process aborts. The shim re-raises +# the signal on the default handler after writing so process-level outcome +# observers (exit_code) still see the death. +_NYX_SIGNAL_NAMES = {} + +def __nyx_install_crash_guard(sink_callee): + import signal, os, time + catchable = [] + for nm in ("SIGSEGV", "SIGABRT", "SIGBUS", "SIGFPE", "SIGILL"): + s = getattr(signal, nm, None) + if s is not None: + catchable.append((nm, s)) + _NYX_SIGNAL_NAMES[s] = nm + def _handler(signum, frame): + nm = _NYX_SIGNAL_NAMES.get(signum, "SIG?") + rec = { + "sink_callee": str(sink_callee), + "args": [], + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {"kind": "Crash", "signal": nm}, + "witness": __nyx_witness(sink_callee, []), + } + __nyx_emit(rec) + # Reset to default and re-raise so the process actually dies. + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) + for _nm, s in catchable: + try: + signal.signal(s, _handler) + except (OSError, ValueError): + pass + + +_NYX_SINK_FILE = "/" +_NYX_SINK_LINE = 14 +_NYX_SINK_HIT = False + +def _nyx_tracer(frame, event, arg): + global _NYX_SINK_HIT + if not _NYX_SINK_HIT and event == "line": + fname = frame.f_code.co_filename + if fname == _NYX_SINK_FILE or fname.endswith(_NYX_SINK_FILE) or ( + os.path.basename(fname) == os.path.basename(_NYX_SINK_FILE) + ): + if _NYX_SINK_LINE <= frame.f_lineno <= _NYX_SINK_LINE + 5: + _NYX_SINK_HIT = True + print("__NYX_SINK_HIT__", flush=True) + return _nyx_tracer + +sys.settrace(_nyx_tracer) + +# ── Payload loading ──────────────────────────────────────────────────────────── +_payload_raw = os.environb.get(b"NYX_PAYLOAD", b"") +if not _payload_raw: + import base64 + _payload_b64 = os.environ.get("NYX_PAYLOAD_B64", "") + if _payload_b64: + _payload_raw = base64.b64decode(_payload_b64) +try: + payload = _payload_raw.decode("utf-8") +except UnicodeDecodeError: + payload = _payload_raw.decode("latin-1") + +# ── Entry module import ──────────────────────────────────────────────────────── +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, ".") +try: + import vuln as _entry_mod +except ImportError as _e: + print(f"NYX_IMPORT_ERROR: {_e}", file=sys.stderr, flush=True) + sys.exit(77) + +# Shape: CLI entry — drives `if __name__ == "__main__":` semantics. +_argv_payload_slot = 0 +_new_argv = ["vuln"] +for _i in range(_argv_payload_slot): + _new_argv.append("") +_new_argv.append(payload) +sys.argv = _new_argv +try: + # If module exposes an explicit `main` callable, prefer that. + _entry_callable = getattr(_entry_mod, "main", None) + if callable(_entry_callable): + _result = _entry_callable() + if _result is not None: + print(str(_result), flush=True) + else: + # Fall back to re-importing under `__main__` to fire the + # `if __name__ == "__main__":` block. + import runpy + runpy.run_module("vuln", run_name="__main__") +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {type(_e).__name__}: {_e}", file=sys.stderr, flush=True) + +sys.settrace(None) diff --git a/tests/dynamic_fixtures/python/django/benign.py b/tests/dynamic_fixtures/python/django/benign.py new file mode 100644 index 00000000..5b7c9c1a --- /dev/null +++ b/tests/dynamic_fixtures/python/django/benign.py @@ -0,0 +1,21 @@ +"""Phase 12 — Django view, benign.""" +import re +import subprocess + +from django.http import HttpResponse + +_VALID_HOST = re.compile(r"^[A-Za-z0-9.-]{1,253}$") + + +def ping(request): + host = request.GET.get("host", "") + if not _VALID_HOST.fullmatch(host): + return HttpResponse("invalid host") + result = subprocess.run( + ["ping", "-c", "1", host], + shell=False, + capture_output=True, + text=True, + timeout=5, + ) + return HttpResponse(result.stdout + result.stderr) diff --git a/tests/dynamic_fixtures/python/django/vuln.py b/tests/dynamic_fixtures/python/django/vuln.py new file mode 100644 index 00000000..4b79ed7b --- /dev/null +++ b/tests/dynamic_fixtures/python/django/vuln.py @@ -0,0 +1,22 @@ +"""Phase 12 — Django view, vulnerable. + +Function-based view driven via `django.test.RequestFactory`. The +harness configures a minimal Django settings module at runtime so the +view can be called without a project layout. +""" +import subprocess + +from django.http import HttpResponse + + +def ping(request): + """Vulnerable: query parameter flows to subprocess(shell=True).""" + host = request.GET.get("host", "") + result = subprocess.run( + "ping -c 1 " + host, + shell=True, + capture_output=True, + text=True, + timeout=5, + ) + return HttpResponse(result.stdout + result.stderr) diff --git a/tests/dynamic_fixtures/python/django/vuln.py.golden_harness.py b/tests/dynamic_fixtures/python/django/vuln.py.golden_harness.py new file mode 100644 index 00000000..cfa61d2d --- /dev/null +++ b/tests/dynamic_fixtures/python/django/vuln.py.golden_harness.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +"""Nyx dynamic harness — auto-generated, do not edit.""" +import os +import sys +import traceback + +# ── Sink-reachability probe (sys.settrace) ──────────────────────────────────── + +# ── __nyx_probe shim (Phase 06 — Track C.1, Phase 08 — Track C.4 + C.5) ────── +# Deny-substring list mirrors crate::dynamic::policy::DENY_KEY_SUBSTRINGS; keep +# in sync when the host-side policy gains new entries. +_NYX_DENY_SUBSTRINGS = ( + "TOKEN", "SECRET", "PASSWORD", "PASSWD", "API_KEY", "APIKEY", + "PRIVATE_KEY", "CREDENTIAL", "SESSION", "COOKIE", "AUTH", "BEARER", + "AWS_ACCESS", "AWS_SESSION", "GH_TOKEN", "GITHUB_TOKEN", "NPM_TOKEN", + "PYPI_TOKEN", "DOCKER_PASS", +) +_NYX_PAYLOAD_LIMIT = 16 * 1024 +_NYX_REDACTED = "" + +def __nyx_scrub_env(): + import os + out = {} + for k, v in os.environ.items(): + ku = str(k).upper() + if any(n in ku for n in _NYX_DENY_SUBSTRINGS): + out[k] = _NYX_REDACTED + else: + out[k] = v + return out + +def __nyx_witness(sink_callee, args): + import os + payload = os.environ.get("NYX_PAYLOAD", "") + payload_bytes = payload.encode("utf-8", "replace") if isinstance(payload, str) else bytes(payload) + if len(payload_bytes) > _NYX_PAYLOAD_LIMIT: + payload_bytes = payload_bytes[:_NYX_PAYLOAD_LIMIT] + args_repr = [] + for a in args: + if isinstance(a, (bytes, bytearray)): + args_repr.append("" % len(a)) + else: + args_repr.append(str(a)) + try: + cwd = os.getcwd() + except OSError: + cwd = "" + return { + "env_snapshot": __nyx_scrub_env(), + "cwd": cwd, + "payload_bytes": list(payload_bytes), + "callee": str(sink_callee), + "args_repr": args_repr, + } + +def __nyx_emit(rec): + import os, json + p = os.environ.get("NYX_PROBE_PATH") + if not p: + return + try: + with open(p, "a") as _f: + _f.write(json.dumps(rec) + "\n") + except OSError: + pass + +def __nyx_probe(sink_callee, *args): + import os, time + serialised = [] + for a in args: + if isinstance(a, (bytes, bytearray)): + serialised.append({"kind": "Bytes", "value": list(a)}) + elif isinstance(a, bool): + serialised.append({"kind": "Int", "value": 1 if a else 0}) + elif isinstance(a, int): + serialised.append({"kind": "Int", "value": a}) + else: + serialised.append({"kind": "String", "value": str(a)}) + rec = { + "sink_callee": str(sink_callee), + "args": serialised, + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {"kind": "Normal"}, + "witness": __nyx_witness(sink_callee, args), + } + __nyx_emit(rec) + +# Phase 08: sink-site signal handler. Call __nyx_install_crash_guard before +# invoking the instrumented sink so a SIGSEGV / SIGABRT / etc. is captured as +# a Crash probe (with witness) before the process aborts. The shim re-raises +# the signal on the default handler after writing so process-level outcome +# observers (exit_code) still see the death. +_NYX_SIGNAL_NAMES = {} + +def __nyx_install_crash_guard(sink_callee): + import signal, os, time + catchable = [] + for nm in ("SIGSEGV", "SIGABRT", "SIGBUS", "SIGFPE", "SIGILL"): + s = getattr(signal, nm, None) + if s is not None: + catchable.append((nm, s)) + _NYX_SIGNAL_NAMES[s] = nm + def _handler(signum, frame): + nm = _NYX_SIGNAL_NAMES.get(signum, "SIG?") + rec = { + "sink_callee": str(sink_callee), + "args": [], + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {"kind": "Crash", "signal": nm}, + "witness": __nyx_witness(sink_callee, []), + } + __nyx_emit(rec) + # Reset to default and re-raise so the process actually dies. + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) + for _nm, s in catchable: + try: + signal.signal(s, _handler) + except (OSError, ValueError): + pass + + +_NYX_SINK_FILE = "/" +_NYX_SINK_LINE = 15 +_NYX_SINK_HIT = False + +def _nyx_tracer(frame, event, arg): + global _NYX_SINK_HIT + if not _NYX_SINK_HIT and event == "line": + fname = frame.f_code.co_filename + if fname == _NYX_SINK_FILE or fname.endswith(_NYX_SINK_FILE) or ( + os.path.basename(fname) == os.path.basename(_NYX_SINK_FILE) + ): + if _NYX_SINK_LINE <= frame.f_lineno <= _NYX_SINK_LINE + 5: + _NYX_SINK_HIT = True + print("__NYX_SINK_HIT__", flush=True) + return _nyx_tracer + +sys.settrace(_nyx_tracer) + +# ── Payload loading ──────────────────────────────────────────────────────────── +_payload_raw = os.environb.get(b"NYX_PAYLOAD", b"") +if not _payload_raw: + import base64 + _payload_b64 = os.environ.get("NYX_PAYLOAD_B64", "") + if _payload_b64: + _payload_raw = base64.b64decode(_payload_b64) +try: + payload = _payload_raw.decode("utf-8") +except UnicodeDecodeError: + payload = _payload_raw.decode("latin-1") + +# ── Entry module import ──────────────────────────────────────────────────────── +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, ".") +try: + import vuln as _entry_mod +except ImportError as _e: + print(f"NYX_IMPORT_ERROR: {_e}", file=sys.stderr, flush=True) + sys.exit(77) + +# Shape: Django view — drive via RequestFactory. +def _nyx_django_setup(): + import django + from django.conf import settings + if not settings.configured: + settings.configure( + DEBUG=False, + DATABASES={"default": {"ENGINE": "django.db.backends.sqlite3", "NAME": ":memory:"}}, + INSTALLED_APPS=["django.contrib.contenttypes", "django.contrib.auth"], + ROOT_URLCONF=None, + ALLOWED_HOSTS=["*"], + SECRET_KEY="nyx-test-key", + USE_TZ=True, + ) + django.setup() + +_nyx_django_setup() +from django.test import RequestFactory + +_view = getattr(_entry_mod, "ping", None) +if _view is None: + # Try class-based view dispatch: find a class whose lowercased name + # matches "ping", instantiate it, and call as_view(). + for attr in dir(_entry_mod): + val = getattr(_entry_mod, attr, None) + if isinstance(val, type): + try: + _view = val.as_view() + break + except Exception: + pass +if _view is None: + print("NYX_DJANGO_VIEW_NOT_FOUND", file=sys.stderr, flush=True) + sys.exit(78) + +_factory = RequestFactory() +_path = "/" +_method = "GET" +_query = {} +_data = None +if "query" == "query": + _query["host"] = payload +elif "query" == "body": + _data = payload +elif "query" == "env": + os.environ["host"] = payload +_factory_method = getattr(_factory, _method.lower(), _factory.get) +_request = _factory_method(_path, data=_query or _data, content_type="text/plain" if _data else None) +try: + _resp = _view(_request) + try: + if hasattr(_resp, "render") and not getattr(_resp, "is_rendered", True): + _resp.render() + _content = getattr(_resp, "content", b"") + if isinstance(_content, (bytes, bytearray)): + _content = _content.decode("utf-8", "replace") + print(_content, flush=True) + except Exception: + pass +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {type(_e).__name__}: {_e}", file=sys.stderr, flush=True) + +sys.settrace(None) diff --git a/tests/dynamic_fixtures/python/fastapi/benign.py b/tests/dynamic_fixtures/python/fastapi/benign.py new file mode 100644 index 00000000..c4ac62bb --- /dev/null +++ b/tests/dynamic_fixtures/python/fastapi/benign.py @@ -0,0 +1,23 @@ +"""Phase 12 — FastAPI route, benign.""" +import re +import subprocess + +from fastapi import FastAPI + +app = FastAPI() + +_VALID_HOST = re.compile(r"^[A-Za-z0-9.-]{1,253}$") + + +@app.get("/ping") +def ping(host: str = ""): + if not _VALID_HOST.fullmatch(host): + return "invalid host" + result = subprocess.run( + ["ping", "-c", "1", host], + shell=False, + capture_output=True, + text=True, + timeout=5, + ) + return result.stdout + result.stderr diff --git a/tests/dynamic_fixtures/python/fastapi/vuln.py b/tests/dynamic_fixtures/python/fastapi/vuln.py new file mode 100644 index 00000000..75f93d33 --- /dev/null +++ b/tests/dynamic_fixtures/python/fastapi/vuln.py @@ -0,0 +1,23 @@ +"""Phase 12 — FastAPI route, vulnerable. + +Nyx harness drives the route through `starlette.testclient.TestClient` +so the framework's normal request pipeline fires without a real socket. +""" +import subprocess + +from fastapi import FastAPI + +app = FastAPI() + + +@app.get("/ping") +def ping(host: str = ""): + """Vulnerable: query parameter flows to subprocess(shell=True).""" + result = subprocess.run( + "ping -c 1 " + host, + shell=True, + capture_output=True, + text=True, + timeout=5, + ) + return result.stdout + result.stderr diff --git a/tests/dynamic_fixtures/python/fastapi/vuln.py.golden_harness.py b/tests/dynamic_fixtures/python/fastapi/vuln.py.golden_harness.py new file mode 100644 index 00000000..8aaa7947 --- /dev/null +++ b/tests/dynamic_fixtures/python/fastapi/vuln.py.golden_harness.py @@ -0,0 +1,234 @@ +#!/usr/bin/env python3 +"""Nyx dynamic harness — auto-generated, do not edit.""" +import os +import sys +import traceback + +# ── Sink-reachability probe (sys.settrace) ──────────────────────────────────── + +# ── __nyx_probe shim (Phase 06 — Track C.1, Phase 08 — Track C.4 + C.5) ────── +# Deny-substring list mirrors crate::dynamic::policy::DENY_KEY_SUBSTRINGS; keep +# in sync when the host-side policy gains new entries. +_NYX_DENY_SUBSTRINGS = ( + "TOKEN", "SECRET", "PASSWORD", "PASSWD", "API_KEY", "APIKEY", + "PRIVATE_KEY", "CREDENTIAL", "SESSION", "COOKIE", "AUTH", "BEARER", + "AWS_ACCESS", "AWS_SESSION", "GH_TOKEN", "GITHUB_TOKEN", "NPM_TOKEN", + "PYPI_TOKEN", "DOCKER_PASS", +) +_NYX_PAYLOAD_LIMIT = 16 * 1024 +_NYX_REDACTED = "" + +def __nyx_scrub_env(): + import os + out = {} + for k, v in os.environ.items(): + ku = str(k).upper() + if any(n in ku for n in _NYX_DENY_SUBSTRINGS): + out[k] = _NYX_REDACTED + else: + out[k] = v + return out + +def __nyx_witness(sink_callee, args): + import os + payload = os.environ.get("NYX_PAYLOAD", "") + payload_bytes = payload.encode("utf-8", "replace") if isinstance(payload, str) else bytes(payload) + if len(payload_bytes) > _NYX_PAYLOAD_LIMIT: + payload_bytes = payload_bytes[:_NYX_PAYLOAD_LIMIT] + args_repr = [] + for a in args: + if isinstance(a, (bytes, bytearray)): + args_repr.append("" % len(a)) + else: + args_repr.append(str(a)) + try: + cwd = os.getcwd() + except OSError: + cwd = "" + return { + "env_snapshot": __nyx_scrub_env(), + "cwd": cwd, + "payload_bytes": list(payload_bytes), + "callee": str(sink_callee), + "args_repr": args_repr, + } + +def __nyx_emit(rec): + import os, json + p = os.environ.get("NYX_PROBE_PATH") + if not p: + return + try: + with open(p, "a") as _f: + _f.write(json.dumps(rec) + "\n") + except OSError: + pass + +def __nyx_probe(sink_callee, *args): + import os, time + serialised = [] + for a in args: + if isinstance(a, (bytes, bytearray)): + serialised.append({"kind": "Bytes", "value": list(a)}) + elif isinstance(a, bool): + serialised.append({"kind": "Int", "value": 1 if a else 0}) + elif isinstance(a, int): + serialised.append({"kind": "Int", "value": a}) + else: + serialised.append({"kind": "String", "value": str(a)}) + rec = { + "sink_callee": str(sink_callee), + "args": serialised, + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {"kind": "Normal"}, + "witness": __nyx_witness(sink_callee, args), + } + __nyx_emit(rec) + +# Phase 08: sink-site signal handler. Call __nyx_install_crash_guard before +# invoking the instrumented sink so a SIGSEGV / SIGABRT / etc. is captured as +# a Crash probe (with witness) before the process aborts. The shim re-raises +# the signal on the default handler after writing so process-level outcome +# observers (exit_code) still see the death. +_NYX_SIGNAL_NAMES = {} + +def __nyx_install_crash_guard(sink_callee): + import signal, os, time + catchable = [] + for nm in ("SIGSEGV", "SIGABRT", "SIGBUS", "SIGFPE", "SIGILL"): + s = getattr(signal, nm, None) + if s is not None: + catchable.append((nm, s)) + _NYX_SIGNAL_NAMES[s] = nm + def _handler(signum, frame): + nm = _NYX_SIGNAL_NAMES.get(signum, "SIG?") + rec = { + "sink_callee": str(sink_callee), + "args": [], + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {"kind": "Crash", "signal": nm}, + "witness": __nyx_witness(sink_callee, []), + } + __nyx_emit(rec) + # Reset to default and re-raise so the process actually dies. + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) + for _nm, s in catchable: + try: + signal.signal(s, _handler) + except (OSError, ValueError): + pass + + +_NYX_SINK_FILE = "/" +_NYX_SINK_LINE = 16 +_NYX_SINK_HIT = False + +def _nyx_tracer(frame, event, arg): + global _NYX_SINK_HIT + if not _NYX_SINK_HIT and event == "line": + fname = frame.f_code.co_filename + if fname == _NYX_SINK_FILE or fname.endswith(_NYX_SINK_FILE) or ( + os.path.basename(fname) == os.path.basename(_NYX_SINK_FILE) + ): + if _NYX_SINK_LINE <= frame.f_lineno <= _NYX_SINK_LINE + 5: + _NYX_SINK_HIT = True + print("__NYX_SINK_HIT__", flush=True) + return _nyx_tracer + +sys.settrace(_nyx_tracer) + +# ── Payload loading ──────────────────────────────────────────────────────────── +_payload_raw = os.environb.get(b"NYX_PAYLOAD", b"") +if not _payload_raw: + import base64 + _payload_b64 = os.environ.get("NYX_PAYLOAD_B64", "") + if _payload_b64: + _payload_raw = base64.b64decode(_payload_b64) +try: + payload = _payload_raw.decode("utf-8") +except UnicodeDecodeError: + payload = _payload_raw.decode("latin-1") + +# ── Entry module import ──────────────────────────────────────────────────────── +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, ".") +try: + import vuln as _entry_mod +except ImportError as _e: + print(f"NYX_IMPORT_ERROR: {_e}", file=sys.stderr, flush=True) + sys.exit(77) + +# Shape: FastAPI route — dispatch via starlette.testclient.TestClient. +def _nyx_resolve_fastapi_app(mod): + try: + from fastapi import FastAPI + except ImportError: + return None + for n in ("app", "application"): + v = getattr(mod, n, None) + if isinstance(v, FastAPI): + return v + for attr in dir(mod): + val = getattr(mod, attr, None) + if isinstance(val, FastAPI): + return val + return None + +_app = _nyx_resolve_fastapi_app(_entry_mod) +if _app is None: + print("NYX_FASTAPI_APP_NOT_FOUND", file=sys.stderr, flush=True) + sys.exit(78) + +try: + from starlette.testclient import TestClient +except ImportError: + print("NYX_FASTAPI_TESTCLIENT_MISSING", file=sys.stderr, flush=True) + sys.exit(79) + +_path = None +for _r in _app.routes: + _name = getattr(_r, "name", None) + _endpoint = getattr(_r, "endpoint", None) + _endpoint_name = getattr(_endpoint, "__name__", None) + if _name == "ping" or _endpoint_name == "ping": + _path = getattr(_r, "path", None) + break +if _path is None and _app.routes: + _path = getattr(_app.routes[0], "path", None) +if _path is None: + print("NYX_FASTAPI_ROUTE_NOT_FOUND", file=sys.stderr, flush=True) + sys.exit(80) + +# Strip path parameters; replace `{param}` with the payload when used +# as the path slot, otherwise with "x". +import re +if "query" == "path": + _path = re.sub(r"\{[^}]+\}", payload, _path, count=1) +else: + _path = re.sub(r"\{[^}]+\}", "x", _path) + +_client = TestClient(_app, raise_server_exceptions=False) +_method = "GET" +_query = {} +_body = None +if "query" == "query": + _query["host"] = payload +elif "query" == "body": + _body = payload +elif "query" == "env": + os.environ["host"] = payload +try: + _resp = _client.request(_method, _path, params=_query, content=_body) + try: + print(_resp.text, flush=True) + except Exception: + pass +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {type(_e).__name__}: {_e}", file=sys.stderr, flush=True) + +sys.settrace(None) diff --git a/tests/dynamic_fixtures/python/flask/benign.py b/tests/dynamic_fixtures/python/flask/benign.py new file mode 100644 index 00000000..24390dad --- /dev/null +++ b/tests/dynamic_fixtures/python/flask/benign.py @@ -0,0 +1,24 @@ +"""Phase 12 — Flask route, benign.""" +import re +import subprocess + +from flask import Flask, request + +app = Flask(__name__) + +_VALID_HOST = re.compile(r"^[A-Za-z0-9.-]{1,253}$") + + +@app.route("/ping", methods=["GET"]) +def ping(): + host = request.args.get("host", "") + if not _VALID_HOST.fullmatch(host): + return "invalid host" + result = subprocess.run( + ["ping", "-c", "1", host], + shell=False, + capture_output=True, + text=True, + timeout=5, + ) + return result.stdout + result.stderr diff --git a/tests/dynamic_fixtures/python/flask/vuln.py b/tests/dynamic_fixtures/python/flask/vuln.py new file mode 100644 index 00000000..6f3d09b9 --- /dev/null +++ b/tests/dynamic_fixtures/python/flask/vuln.py @@ -0,0 +1,25 @@ +"""Phase 12 — Flask route, vulnerable. + +Vulnerable route reads the `host` query parameter and concatenates it +into a shell command. Nyx harness reaches the route via +`app.test_client()` so no real network listener is bound. +""" +import subprocess + +from flask import Flask, request + +app = Flask(__name__) + + +@app.route("/ping", methods=["GET"]) +def ping(): + """Vulnerable: untrusted query param flows to subprocess(shell=True).""" + host = request.args.get("host", "") + result = subprocess.run( + "ping -c 1 " + host, + shell=True, + capture_output=True, + text=True, + timeout=5, + ) + return result.stdout + result.stderr diff --git a/tests/dynamic_fixtures/python/flask/vuln.py.golden_harness.py b/tests/dynamic_fixtures/python/flask/vuln.py.golden_harness.py new file mode 100644 index 00000000..5db8b05a --- /dev/null +++ b/tests/dynamic_fixtures/python/flask/vuln.py.golden_harness.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python3 +"""Nyx dynamic harness — auto-generated, do not edit.""" +import os +import sys +import traceback + +# ── Sink-reachability probe (sys.settrace) ──────────────────────────────────── + +# ── __nyx_probe shim (Phase 06 — Track C.1, Phase 08 — Track C.4 + C.5) ────── +# Deny-substring list mirrors crate::dynamic::policy::DENY_KEY_SUBSTRINGS; keep +# in sync when the host-side policy gains new entries. +_NYX_DENY_SUBSTRINGS = ( + "TOKEN", "SECRET", "PASSWORD", "PASSWD", "API_KEY", "APIKEY", + "PRIVATE_KEY", "CREDENTIAL", "SESSION", "COOKIE", "AUTH", "BEARER", + "AWS_ACCESS", "AWS_SESSION", "GH_TOKEN", "GITHUB_TOKEN", "NPM_TOKEN", + "PYPI_TOKEN", "DOCKER_PASS", +) +_NYX_PAYLOAD_LIMIT = 16 * 1024 +_NYX_REDACTED = "" + +def __nyx_scrub_env(): + import os + out = {} + for k, v in os.environ.items(): + ku = str(k).upper() + if any(n in ku for n in _NYX_DENY_SUBSTRINGS): + out[k] = _NYX_REDACTED + else: + out[k] = v + return out + +def __nyx_witness(sink_callee, args): + import os + payload = os.environ.get("NYX_PAYLOAD", "") + payload_bytes = payload.encode("utf-8", "replace") if isinstance(payload, str) else bytes(payload) + if len(payload_bytes) > _NYX_PAYLOAD_LIMIT: + payload_bytes = payload_bytes[:_NYX_PAYLOAD_LIMIT] + args_repr = [] + for a in args: + if isinstance(a, (bytes, bytearray)): + args_repr.append("" % len(a)) + else: + args_repr.append(str(a)) + try: + cwd = os.getcwd() + except OSError: + cwd = "" + return { + "env_snapshot": __nyx_scrub_env(), + "cwd": cwd, + "payload_bytes": list(payload_bytes), + "callee": str(sink_callee), + "args_repr": args_repr, + } + +def __nyx_emit(rec): + import os, json + p = os.environ.get("NYX_PROBE_PATH") + if not p: + return + try: + with open(p, "a") as _f: + _f.write(json.dumps(rec) + "\n") + except OSError: + pass + +def __nyx_probe(sink_callee, *args): + import os, time + serialised = [] + for a in args: + if isinstance(a, (bytes, bytearray)): + serialised.append({"kind": "Bytes", "value": list(a)}) + elif isinstance(a, bool): + serialised.append({"kind": "Int", "value": 1 if a else 0}) + elif isinstance(a, int): + serialised.append({"kind": "Int", "value": a}) + else: + serialised.append({"kind": "String", "value": str(a)}) + rec = { + "sink_callee": str(sink_callee), + "args": serialised, + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {"kind": "Normal"}, + "witness": __nyx_witness(sink_callee, args), + } + __nyx_emit(rec) + +# Phase 08: sink-site signal handler. Call __nyx_install_crash_guard before +# invoking the instrumented sink so a SIGSEGV / SIGABRT / etc. is captured as +# a Crash probe (with witness) before the process aborts. The shim re-raises +# the signal on the default handler after writing so process-level outcome +# observers (exit_code) still see the death. +_NYX_SIGNAL_NAMES = {} + +def __nyx_install_crash_guard(sink_callee): + import signal, os, time + catchable = [] + for nm in ("SIGSEGV", "SIGABRT", "SIGBUS", "SIGFPE", "SIGILL"): + s = getattr(signal, nm, None) + if s is not None: + catchable.append((nm, s)) + _NYX_SIGNAL_NAMES[s] = nm + def _handler(signum, frame): + nm = _NYX_SIGNAL_NAMES.get(signum, "SIG?") + rec = { + "sink_callee": str(sink_callee), + "args": [], + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {"kind": "Crash", "signal": nm}, + "witness": __nyx_witness(sink_callee, []), + } + __nyx_emit(rec) + # Reset to default and re-raise so the process actually dies. + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) + for _nm, s in catchable: + try: + signal.signal(s, _handler) + except (OSError, ValueError): + pass + + +_NYX_SINK_FILE = "/" +_NYX_SINK_LINE = 18 +_NYX_SINK_HIT = False + +def _nyx_tracer(frame, event, arg): + global _NYX_SINK_HIT + if not _NYX_SINK_HIT and event == "line": + fname = frame.f_code.co_filename + if fname == _NYX_SINK_FILE or fname.endswith(_NYX_SINK_FILE) or ( + os.path.basename(fname) == os.path.basename(_NYX_SINK_FILE) + ): + if _NYX_SINK_LINE <= frame.f_lineno <= _NYX_SINK_LINE + 5: + _NYX_SINK_HIT = True + print("__NYX_SINK_HIT__", flush=True) + return _nyx_tracer + +sys.settrace(_nyx_tracer) + +# ── Payload loading ──────────────────────────────────────────────────────────── +_payload_raw = os.environb.get(b"NYX_PAYLOAD", b"") +if not _payload_raw: + import base64 + _payload_b64 = os.environ.get("NYX_PAYLOAD_B64", "") + if _payload_b64: + _payload_raw = base64.b64decode(_payload_b64) +try: + payload = _payload_raw.decode("utf-8") +except UnicodeDecodeError: + payload = _payload_raw.decode("latin-1") + +# ── Entry module import ──────────────────────────────────────────────────────── +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, ".") +try: + import vuln as _entry_mod +except ImportError as _e: + print(f"NYX_IMPORT_ERROR: {_e}", file=sys.stderr, flush=True) + sys.exit(77) + +# Shape: Flask route — dispatch via app.test_client(). +def _nyx_resolve_flask_app(mod): + from flask import Flask + candidates = [getattr(mod, n, None) for n in ("app", "application", "create_app")] + for c in candidates: + if callable(c) and not isinstance(c, Flask): + try: + got = c() + if isinstance(got, Flask): + return got + except TypeError: + pass + if isinstance(c, Flask): + return c + for attr in dir(mod): + val = getattr(mod, attr, None) + if isinstance(val, Flask): + return val + return None + +_app = _nyx_resolve_flask_app(_entry_mod) +if _app is None: + print("NYX_FLASK_APP_NOT_FOUND", file=sys.stderr, flush=True) + sys.exit(78) + +_route = None +for _r in _app.url_map.iter_rules(): + if _r.endpoint == "ping" or _r.endpoint.endswith("." + "ping"): + _route = _r + break +if _route is None: + # Fall back: any rule will do, but pick the first POST/GET. + _rules = list(_app.url_map.iter_rules()) + _route = _rules[0] if _rules else None +if _route is None: + print("NYX_FLASK_ROUTE_NOT_FOUND", file=sys.stderr, flush=True) + sys.exit(79) + +_path = _route.rule +# Strip route parameters; replace `` with payload when used as +# the path slot, otherwise with "x". +import re +if "query" == "path": + _path = re.sub(r"<[^>]+>", payload, _path, count=1) +else: + _path = re.sub(r"<[^>]+>", "x", _path) + +_client = _app.test_client() +_method = "GET" +_query = {} +_data = None +if "query" == "query": + _query["host"] = payload +elif "query" == "body": + _data = payload +elif "query" == "env": + os.environ["host"] = payload +try: + _resp = _client.open(_path, method=_method, query_string=_query, data=_data) + try: + print(_resp.get_data(as_text=True), flush=True) + except Exception: + pass +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {type(_e).__name__}: {_e}", file=sys.stderr, flush=True) + +sys.settrace(None) diff --git a/tests/dynamic_fixtures/python/generic/benign.py b/tests/dynamic_fixtures/python/generic/benign.py new file mode 100644 index 00000000..637c32e9 --- /dev/null +++ b/tests/dynamic_fixtures/python/generic/benign.py @@ -0,0 +1,28 @@ +"""Phase 12 — generic shape, benign. + +Validates the input against a strict allow-list (alphanumerics + dots +only — RFC-1035 hostname character set) and refuses to shell out when +the input contains anything outside the allow-list. The CMDI marker +substring (`NYX_PWN_CMDI`) never reaches stdout because the function +returns before any subprocess call when the validation fails. +""" +import re +import subprocess + +_VALID_HOST = re.compile(r"^[A-Za-z0-9.-]{1,253}$") + + +def run_ping(host): + """Safe: allow-list validation; refuse and return on mismatch.""" + if not _VALID_HOST.fullmatch(host or ""): + print("invalid host") + return + result = subprocess.run( + ["ping", "-c", "1", host], + shell=False, + capture_output=True, + text=True, + timeout=5, + ) + print(result.stdout) + print(result.stderr, end="") diff --git a/tests/dynamic_fixtures/python/generic/vuln.py b/tests/dynamic_fixtures/python/generic/vuln.py new file mode 100644 index 00000000..6a4dc990 --- /dev/null +++ b/tests/dynamic_fixtures/python/generic/vuln.py @@ -0,0 +1,20 @@ +"""Phase 12 — generic shape, vulnerable. + +Module-level function that shells out with user input directly +concatenated. Mirrors `cmdi_positive.py` but lives under the per-shape +fixture tree so the shape detector hits the `Generic` path. +""" +import subprocess + + +def run_ping(host): + """Vulnerable: user input concatenated into shell command.""" + result = subprocess.run( + "ping -c 1 " + host, + shell=True, + capture_output=True, + text=True, + timeout=5, + ) + print(result.stdout) + print(result.stderr, end="") diff --git a/tests/dynamic_fixtures/python/generic/vuln.py.golden_harness.py b/tests/dynamic_fixtures/python/generic/vuln.py.golden_harness.py new file mode 100644 index 00000000..21ffeb8e --- /dev/null +++ b/tests/dynamic_fixtures/python/generic/vuln.py.golden_harness.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +"""Nyx dynamic harness — auto-generated, do not edit.""" +import os +import sys +import traceback + +# ── Sink-reachability probe (sys.settrace) ──────────────────────────────────── + +# ── __nyx_probe shim (Phase 06 — Track C.1, Phase 08 — Track C.4 + C.5) ────── +# Deny-substring list mirrors crate::dynamic::policy::DENY_KEY_SUBSTRINGS; keep +# in sync when the host-side policy gains new entries. +_NYX_DENY_SUBSTRINGS = ( + "TOKEN", "SECRET", "PASSWORD", "PASSWD", "API_KEY", "APIKEY", + "PRIVATE_KEY", "CREDENTIAL", "SESSION", "COOKIE", "AUTH", "BEARER", + "AWS_ACCESS", "AWS_SESSION", "GH_TOKEN", "GITHUB_TOKEN", "NPM_TOKEN", + "PYPI_TOKEN", "DOCKER_PASS", +) +_NYX_PAYLOAD_LIMIT = 16 * 1024 +_NYX_REDACTED = "" + +def __nyx_scrub_env(): + import os + out = {} + for k, v in os.environ.items(): + ku = str(k).upper() + if any(n in ku for n in _NYX_DENY_SUBSTRINGS): + out[k] = _NYX_REDACTED + else: + out[k] = v + return out + +def __nyx_witness(sink_callee, args): + import os + payload = os.environ.get("NYX_PAYLOAD", "") + payload_bytes = payload.encode("utf-8", "replace") if isinstance(payload, str) else bytes(payload) + if len(payload_bytes) > _NYX_PAYLOAD_LIMIT: + payload_bytes = payload_bytes[:_NYX_PAYLOAD_LIMIT] + args_repr = [] + for a in args: + if isinstance(a, (bytes, bytearray)): + args_repr.append("" % len(a)) + else: + args_repr.append(str(a)) + try: + cwd = os.getcwd() + except OSError: + cwd = "" + return { + "env_snapshot": __nyx_scrub_env(), + "cwd": cwd, + "payload_bytes": list(payload_bytes), + "callee": str(sink_callee), + "args_repr": args_repr, + } + +def __nyx_emit(rec): + import os, json + p = os.environ.get("NYX_PROBE_PATH") + if not p: + return + try: + with open(p, "a") as _f: + _f.write(json.dumps(rec) + "\n") + except OSError: + pass + +def __nyx_probe(sink_callee, *args): + import os, time + serialised = [] + for a in args: + if isinstance(a, (bytes, bytearray)): + serialised.append({"kind": "Bytes", "value": list(a)}) + elif isinstance(a, bool): + serialised.append({"kind": "Int", "value": 1 if a else 0}) + elif isinstance(a, int): + serialised.append({"kind": "Int", "value": a}) + else: + serialised.append({"kind": "String", "value": str(a)}) + rec = { + "sink_callee": str(sink_callee), + "args": serialised, + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {"kind": "Normal"}, + "witness": __nyx_witness(sink_callee, args), + } + __nyx_emit(rec) + +# Phase 08: sink-site signal handler. Call __nyx_install_crash_guard before +# invoking the instrumented sink so a SIGSEGV / SIGABRT / etc. is captured as +# a Crash probe (with witness) before the process aborts. The shim re-raises +# the signal on the default handler after writing so process-level outcome +# observers (exit_code) still see the death. +_NYX_SIGNAL_NAMES = {} + +def __nyx_install_crash_guard(sink_callee): + import signal, os, time + catchable = [] + for nm in ("SIGSEGV", "SIGABRT", "SIGBUS", "SIGFPE", "SIGILL"): + s = getattr(signal, nm, None) + if s is not None: + catchable.append((nm, s)) + _NYX_SIGNAL_NAMES[s] = nm + def _handler(signum, frame): + nm = _NYX_SIGNAL_NAMES.get(signum, "SIG?") + rec = { + "sink_callee": str(sink_callee), + "args": [], + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {"kind": "Crash", "signal": nm}, + "witness": __nyx_witness(sink_callee, []), + } + __nyx_emit(rec) + # Reset to default and re-raise so the process actually dies. + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) + for _nm, s in catchable: + try: + signal.signal(s, _handler) + except (OSError, ValueError): + pass + + +_NYX_SINK_FILE = "/" +_NYX_SINK_LINE = 12 +_NYX_SINK_HIT = False + +def _nyx_tracer(frame, event, arg): + global _NYX_SINK_HIT + if not _NYX_SINK_HIT and event == "line": + fname = frame.f_code.co_filename + if fname == _NYX_SINK_FILE or fname.endswith(_NYX_SINK_FILE) or ( + os.path.basename(fname) == os.path.basename(_NYX_SINK_FILE) + ): + if _NYX_SINK_LINE <= frame.f_lineno <= _NYX_SINK_LINE + 5: + _NYX_SINK_HIT = True + print("__NYX_SINK_HIT__", flush=True) + return _nyx_tracer + +sys.settrace(_nyx_tracer) + +# ── Payload loading ──────────────────────────────────────────────────────────── +_payload_raw = os.environb.get(b"NYX_PAYLOAD", b"") +if not _payload_raw: + import base64 + _payload_b64 = os.environ.get("NYX_PAYLOAD_B64", "") + if _payload_b64: + _payload_raw = base64.b64decode(_payload_b64) +try: + payload = _payload_raw.decode("utf-8") +except UnicodeDecodeError: + payload = _payload_raw.decode("latin-1") + +# ── Entry module import ──────────────────────────────────────────────────────── +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, ".") +try: + import vuln as _entry_mod +except ImportError as _e: + print(f"NYX_IMPORT_ERROR: {_e}", file=sys.stderr, flush=True) + sys.exit(77) + +# Shape: generic module-level function. + +try: + _result = _entry_mod.run_ping(payload) + if _result is not None: + try: + print(str(_result), flush=True) + except Exception: + pass +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {type(_e).__name__}: {_e}", file=sys.stderr, flush=True) + +sys.settrace(None) diff --git a/tests/dynamic_fixtures/python/pytest/benign.py b/tests/dynamic_fixtures/python/pytest/benign.py new file mode 100644 index 00000000..26f73869 --- /dev/null +++ b/tests/dynamic_fixtures/python/pytest/benign.py @@ -0,0 +1,22 @@ +"""Phase 12 — pytest shape, benign.""" +import os +import re +import subprocess + +_VALID_HOST = re.compile(r"^[A-Za-z0-9.-]{1,253}$") + + +def test_run_ping(): + host = os.environ.get("NYX_PAYLOAD", "") + if not _VALID_HOST.fullmatch(host): + print("invalid host") + return + result = subprocess.run( + ["ping", "-c", "1", host], + shell=False, + capture_output=True, + text=True, + timeout=5, + ) + print(result.stdout) + print(result.stderr, end="") diff --git a/tests/dynamic_fixtures/python/pytest/vuln.py b/tests/dynamic_fixtures/python/pytest/vuln.py new file mode 100644 index 00000000..38bab83d --- /dev/null +++ b/tests/dynamic_fixtures/python/pytest/vuln.py @@ -0,0 +1,22 @@ +"""Phase 12 — pytest shape, vulnerable. + +Pytest convention: function name starts with `test_`. Nyx harness +injects the payload via the `NYX_PAYLOAD` env var (the same channel +pytest fixtures typically read from). +""" +import os +import subprocess + + +def test_run_ping(): + """Vulnerable test: reads host from env, concatenates into shell.""" + host = os.environ.get("NYX_PAYLOAD", "") + result = subprocess.run( + "ping -c 1 " + host, + shell=True, + capture_output=True, + text=True, + timeout=5, + ) + print(result.stdout) + print(result.stderr, end="") diff --git a/tests/dynamic_fixtures/python/pytest/vuln.py.golden_harness.py b/tests/dynamic_fixtures/python/pytest/vuln.py.golden_harness.py new file mode 100644 index 00000000..a5901bd9 --- /dev/null +++ b/tests/dynamic_fixtures/python/pytest/vuln.py.golden_harness.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python3 +"""Nyx dynamic harness — auto-generated, do not edit.""" +import os +import sys +import traceback + +# ── Sink-reachability probe (sys.settrace) ──────────────────────────────────── + +# ── __nyx_probe shim (Phase 06 — Track C.1, Phase 08 — Track C.4 + C.5) ────── +# Deny-substring list mirrors crate::dynamic::policy::DENY_KEY_SUBSTRINGS; keep +# in sync when the host-side policy gains new entries. +_NYX_DENY_SUBSTRINGS = ( + "TOKEN", "SECRET", "PASSWORD", "PASSWD", "API_KEY", "APIKEY", + "PRIVATE_KEY", "CREDENTIAL", "SESSION", "COOKIE", "AUTH", "BEARER", + "AWS_ACCESS", "AWS_SESSION", "GH_TOKEN", "GITHUB_TOKEN", "NPM_TOKEN", + "PYPI_TOKEN", "DOCKER_PASS", +) +_NYX_PAYLOAD_LIMIT = 16 * 1024 +_NYX_REDACTED = "" + +def __nyx_scrub_env(): + import os + out = {} + for k, v in os.environ.items(): + ku = str(k).upper() + if any(n in ku for n in _NYX_DENY_SUBSTRINGS): + out[k] = _NYX_REDACTED + else: + out[k] = v + return out + +def __nyx_witness(sink_callee, args): + import os + payload = os.environ.get("NYX_PAYLOAD", "") + payload_bytes = payload.encode("utf-8", "replace") if isinstance(payload, str) else bytes(payload) + if len(payload_bytes) > _NYX_PAYLOAD_LIMIT: + payload_bytes = payload_bytes[:_NYX_PAYLOAD_LIMIT] + args_repr = [] + for a in args: + if isinstance(a, (bytes, bytearray)): + args_repr.append("" % len(a)) + else: + args_repr.append(str(a)) + try: + cwd = os.getcwd() + except OSError: + cwd = "" + return { + "env_snapshot": __nyx_scrub_env(), + "cwd": cwd, + "payload_bytes": list(payload_bytes), + "callee": str(sink_callee), + "args_repr": args_repr, + } + +def __nyx_emit(rec): + import os, json + p = os.environ.get("NYX_PROBE_PATH") + if not p: + return + try: + with open(p, "a") as _f: + _f.write(json.dumps(rec) + "\n") + except OSError: + pass + +def __nyx_probe(sink_callee, *args): + import os, time + serialised = [] + for a in args: + if isinstance(a, (bytes, bytearray)): + serialised.append({"kind": "Bytes", "value": list(a)}) + elif isinstance(a, bool): + serialised.append({"kind": "Int", "value": 1 if a else 0}) + elif isinstance(a, int): + serialised.append({"kind": "Int", "value": a}) + else: + serialised.append({"kind": "String", "value": str(a)}) + rec = { + "sink_callee": str(sink_callee), + "args": serialised, + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {"kind": "Normal"}, + "witness": __nyx_witness(sink_callee, args), + } + __nyx_emit(rec) + +# Phase 08: sink-site signal handler. Call __nyx_install_crash_guard before +# invoking the instrumented sink so a SIGSEGV / SIGABRT / etc. is captured as +# a Crash probe (with witness) before the process aborts. The shim re-raises +# the signal on the default handler after writing so process-level outcome +# observers (exit_code) still see the death. +_NYX_SIGNAL_NAMES = {} + +def __nyx_install_crash_guard(sink_callee): + import signal, os, time + catchable = [] + for nm in ("SIGSEGV", "SIGABRT", "SIGBUS", "SIGFPE", "SIGILL"): + s = getattr(signal, nm, None) + if s is not None: + catchable.append((nm, s)) + _NYX_SIGNAL_NAMES[s] = nm + def _handler(signum, frame): + nm = _NYX_SIGNAL_NAMES.get(signum, "SIG?") + rec = { + "sink_callee": str(sink_callee), + "args": [], + "captured_at_ns": time.time_ns(), + "payload_id": os.environ.get("NYX_PAYLOAD_ID", ""), + "kind": {"kind": "Crash", "signal": nm}, + "witness": __nyx_witness(sink_callee, []), + } + __nyx_emit(rec) + # Reset to default and re-raise so the process actually dies. + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) + for _nm, s in catchable: + try: + signal.signal(s, _handler) + except (OSError, ValueError): + pass + + +_NYX_SINK_FILE = "/" +_NYX_SINK_LINE = 14 +_NYX_SINK_HIT = False + +def _nyx_tracer(frame, event, arg): + global _NYX_SINK_HIT + if not _NYX_SINK_HIT and event == "line": + fname = frame.f_code.co_filename + if fname == _NYX_SINK_FILE or fname.endswith(_NYX_SINK_FILE) or ( + os.path.basename(fname) == os.path.basename(_NYX_SINK_FILE) + ): + if _NYX_SINK_LINE <= frame.f_lineno <= _NYX_SINK_LINE + 5: + _NYX_SINK_HIT = True + print("__NYX_SINK_HIT__", flush=True) + return _nyx_tracer + +sys.settrace(_nyx_tracer) + +# ── Payload loading ──────────────────────────────────────────────────────────── +_payload_raw = os.environb.get(b"NYX_PAYLOAD", b"") +if not _payload_raw: + import base64 + _payload_b64 = os.environ.get("NYX_PAYLOAD_B64", "") + if _payload_b64: + _payload_raw = base64.b64decode(_payload_b64) +try: + payload = _payload_raw.decode("utf-8") +except UnicodeDecodeError: + payload = _payload_raw.decode("latin-1") + +# ── Entry module import ──────────────────────────────────────────────────────── +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, ".") +try: + import vuln as _entry_mod +except ImportError as _e: + print(f"NYX_IMPORT_ERROR: {_e}", file=sys.stderr, flush=True) + sys.exit(77) + +# Shape: pytest function — drive the single test directly. +os.environ["NYX_PAYLOAD"] = payload +try: + _result = _entry_mod.test_run_ping() + if _result is not None: + try: + print(str(_result), flush=True) + except Exception: + pass +except AssertionError as _e: + # AssertionError is the typical pytest failure path; observable. + print(f"NYX_ASSERT: {_e}", file=sys.stderr, flush=True) +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {type(_e).__name__}: {_e}", file=sys.stderr, flush=True) + +sys.settrace(None) diff --git a/tests/dynamic_fixtures/spec_strategies/callgraph_entry_http.rs b/tests/dynamic_fixtures/spec_strategies/callgraph_entry_http.rs new file mode 100644 index 00000000..a6b90ac0 --- /dev/null +++ b/tests/dynamic_fixtures/spec_strategies/callgraph_entry_http.rs @@ -0,0 +1,12 @@ +// Fixture: spec derived via FromCallgraphEntry (rule id matches `*.http.*`, +// entry point classified as HttpRoute). +// +// Phase 12 — Track B added HttpRoute to the Python emitter's SUPPORTED list, +// so to keep the entry-kind gate test honest the fixture targets Rust, whose +// emitter still advertises `[EntryKind::Function]` only. + +use actix_web::{web, HttpResponse, Responder}; + +pub async fn echo(query: web::Query>) -> impl Responder { + HttpResponse::Ok().body(query.get("q").cloned().unwrap_or_default()) +} diff --git a/tests/python_fixtures.rs b/tests/python_fixtures.rs index 7b8dff21..7e8d0df8 100644 --- a/tests/python_fixtures.rs +++ b/tests/python_fixtures.rs @@ -14,12 +14,15 @@ mod common; #[cfg(feature = "dynamic")] mod python_fixture_tests { use crate::common::fixture_harness::{ - run_fixture_and_compare_to_golden, CopyStrategy, FixtureSpec, + run_fixture_and_compare_to_golden, run_harness_snapshot, run_shape_fixture, + CopyStrategy, FixtureSpec, }; use nyx_scanner::commands::scan::Diag; + use nyx_scanner::dynamic::spec::PayloadSlot; use nyx_scanner::dynamic::verify::{verify_finding, VerifyOptions}; use nyx_scanner::evidence::{ - Confidence, Evidence, FlowStep, FlowStepKind, UnsupportedReason, VerifyStatus, + Confidence, EntryKind, Evidence, FlowStep, FlowStepKind, UnsupportedReason, + VerifyStatus, }; use nyx_scanner::labels::Cap; use nyx_scanner::patterns::{FindingCategory, Severity}; @@ -275,6 +278,328 @@ mod python_fixture_tests { } } + // ── Phase 12 — per-shape acceptance ────────────────────────────────────── + // + // For each shape the suite asserts: + // 1. The vuln fixture confirms (oracle fires, sink hit). + // 2. The benign fixture does NOT confirm. + // 3. The emitted harness source matches the per-shape golden + // snapshot under `tests/dynamic_fixtures/python//`. + // + // Framework-bound shapes (Flask / FastAPI / Django / Celery) skip + // with an `eprintln!` when the framework is unimportable in the + // host's `python3` (and therefore unavailable to the harness's + // built venv without a successful pip install). + + fn python_module_available(module: &'static str) -> bool { + std::process::Command::new("python3") + .arg("-c") + .arg(format!("import {module}")) + .output() + .map(|o| o.status.success()) + .unwrap_or(false) + } + + fn assert_confirmed(shape: &str, result: &nyx_scanner::evidence::VerifyResult) { + assert_eq!( + result.status, + VerifyStatus::Confirmed, + "{shape}/vuln.py: expected Confirmed, got {:?} ({:?})", + result.status, + result.detail, + ); + } + + fn assert_not_confirmed(shape: &str, result: &nyx_scanner::evidence::VerifyResult) { + assert!( + matches!( + result.status, + VerifyStatus::NotConfirmed | VerifyStatus::Inconclusive + ), + "{shape}/benign.py: expected NotConfirmed (or Inconclusive), got {:?} ({:?})", + result.status, + result.detail, + ); + // Tighter check: a benign fixture must never light up `Confirmed`. + assert_ne!( + result.status, + VerifyStatus::Confirmed, + "{shape}/benign.py: must not confirm", + ); + } + + // ── generic ───────────────────────────────────────────────────────────── + + #[test] + fn generic_vuln_is_confirmed() { + if !python3_available() { eprintln!("SKIP: python3 not available"); return; } + let r = run_shape_fixture( + "generic", "vuln.py", "run_ping", Cap::CODE_EXEC, 12, + EntryKind::Function, PayloadSlot::Param(0), + ); + assert_confirmed("generic", &r); + } + + #[test] + fn generic_benign_not_confirmed() { + if !python3_available() { eprintln!("SKIP: python3 not available"); return; } + let r = run_shape_fixture( + "generic", "benign.py", "run_ping", Cap::CODE_EXEC, 20, + EntryKind::Function, PayloadSlot::Param(0), + ); + assert_not_confirmed("generic", &r); + } + + #[test] + fn generic_harness_snapshot_matches_golden() { + run_harness_snapshot( + "generic", "vuln.py", "run_ping", Cap::CODE_EXEC, 12, + EntryKind::Function, PayloadSlot::Param(0), + ); + } + + // ── cli ───────────────────────────────────────────────────────────────── + + #[test] + fn cli_vuln_is_confirmed() { + if !python3_available() { eprintln!("SKIP: python3 not available"); return; } + let r = run_shape_fixture( + "cli", "vuln.py", "main", Cap::CODE_EXEC, 14, + EntryKind::CliSubcommand, PayloadSlot::Argv(0), + ); + assert_confirmed("cli", &r); + } + + #[test] + fn cli_benign_not_confirmed() { + if !python3_available() { eprintln!("SKIP: python3 not available"); return; } + let r = run_shape_fixture( + "cli", "benign.py", "main", Cap::CODE_EXEC, 11, + EntryKind::CliSubcommand, PayloadSlot::Argv(0), + ); + assert_not_confirmed("cli", &r); + } + + #[test] + fn cli_harness_snapshot_matches_golden() { + run_harness_snapshot( + "cli", "vuln.py", "main", Cap::CODE_EXEC, 14, + EntryKind::CliSubcommand, PayloadSlot::Argv(0), + ); + } + + // ── pytest ────────────────────────────────────────────────────────────── + + #[test] + fn pytest_vuln_is_confirmed() { + if !python3_available() { eprintln!("SKIP: python3 not available"); return; } + let r = run_shape_fixture( + "pytest", "vuln.py", "test_run_ping", Cap::CODE_EXEC, 14, + EntryKind::Function, PayloadSlot::EnvVar("NYX_PAYLOAD".into()), + ); + assert_confirmed("pytest", &r); + } + + #[test] + fn pytest_benign_not_confirmed() { + if !python3_available() { eprintln!("SKIP: python3 not available"); return; } + let r = run_shape_fixture( + "pytest", "benign.py", "test_run_ping", Cap::CODE_EXEC, 14, + EntryKind::Function, PayloadSlot::EnvVar("NYX_PAYLOAD".into()), + ); + assert_not_confirmed("pytest", &r); + } + + #[test] + fn pytest_harness_snapshot_matches_golden() { + run_harness_snapshot( + "pytest", "vuln.py", "test_run_ping", Cap::CODE_EXEC, 14, + EntryKind::Function, PayloadSlot::EnvVar("NYX_PAYLOAD".into()), + ); + } + + // ── async ─────────────────────────────────────────────────────────────── + + #[test] + fn async_vuln_is_confirmed() { + if !python3_available() { eprintln!("SKIP: python3 not available"); return; } + let r = run_shape_fixture( + "async", "vuln.py", "run_ping", Cap::CODE_EXEC, 13, + EntryKind::Function, PayloadSlot::Param(0), + ); + assert_confirmed("async", &r); + } + + #[test] + fn async_benign_not_confirmed() { + if !python3_available() { eprintln!("SKIP: python3 not available"); return; } + let r = run_shape_fixture( + "async", "benign.py", "run_ping", Cap::CODE_EXEC, 14, + EntryKind::Function, PayloadSlot::Param(0), + ); + assert_not_confirmed("async", &r); + } + + #[test] + fn async_harness_snapshot_matches_golden() { + run_harness_snapshot( + "async", "vuln.py", "run_ping", Cap::CODE_EXEC, 13, + EntryKind::Function, PayloadSlot::Param(0), + ); + } + + // ── celery ────────────────────────────────────────────────────────────── + + #[test] + fn celery_vuln_is_confirmed() { + if !python3_available() { eprintln!("SKIP: python3 not available"); return; } + if !python_module_available("celery") { + eprintln!("SKIP: celery not importable"); + return; + } + let r = run_shape_fixture( + "celery", "vuln.py", "run_job", Cap::CODE_EXEC, 17, + EntryKind::Function, PayloadSlot::Param(0), + ); + assert_confirmed("celery", &r); + } + + #[test] + fn celery_benign_not_confirmed() { + if !python3_available() { eprintln!("SKIP: python3 not available"); return; } + if !python_module_available("celery") { + eprintln!("SKIP: celery not importable"); + return; + } + let r = run_shape_fixture( + "celery", "benign.py", "run_job", Cap::CODE_EXEC, 17, + EntryKind::Function, PayloadSlot::Param(0), + ); + assert_not_confirmed("celery", &r); + } + + #[test] + fn celery_harness_snapshot_matches_golden() { + run_harness_snapshot( + "celery", "vuln.py", "run_job", Cap::CODE_EXEC, 17, + EntryKind::Function, PayloadSlot::Param(0), + ); + } + + // ── flask ─────────────────────────────────────────────────────────────── + + #[test] + fn flask_vuln_is_confirmed() { + if !python3_available() { eprintln!("SKIP: python3 not available"); return; } + if !python_module_available("flask") { + eprintln!("SKIP: flask not importable"); + return; + } + let r = run_shape_fixture( + "flask", "vuln.py", "ping", Cap::CODE_EXEC, 18, + EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()), + ); + assert_confirmed("flask", &r); + } + + #[test] + fn flask_benign_not_confirmed() { + if !python3_available() { eprintln!("SKIP: python3 not available"); return; } + if !python_module_available("flask") { + eprintln!("SKIP: flask not importable"); + return; + } + let r = run_shape_fixture( + "flask", "benign.py", "ping", Cap::CODE_EXEC, 17, + EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()), + ); + assert_not_confirmed("flask", &r); + } + + #[test] + fn flask_harness_snapshot_matches_golden() { + run_harness_snapshot( + "flask", "vuln.py", "ping", Cap::CODE_EXEC, 18, + EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()), + ); + } + + // ── fastapi ───────────────────────────────────────────────────────────── + + #[test] + fn fastapi_vuln_is_confirmed() { + if !python3_available() { eprintln!("SKIP: python3 not available"); return; } + if !python_module_available("fastapi") { + eprintln!("SKIP: fastapi not importable"); + return; + } + let r = run_shape_fixture( + "fastapi", "vuln.py", "ping", Cap::CODE_EXEC, 16, + EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()), + ); + assert_confirmed("fastapi", &r); + } + + #[test] + fn fastapi_benign_not_confirmed() { + if !python3_available() { eprintln!("SKIP: python3 not available"); return; } + if !python_module_available("fastapi") { + eprintln!("SKIP: fastapi not importable"); + return; + } + let r = run_shape_fixture( + "fastapi", "benign.py", "ping", Cap::CODE_EXEC, 16, + EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()), + ); + assert_not_confirmed("fastapi", &r); + } + + #[test] + fn fastapi_harness_snapshot_matches_golden() { + run_harness_snapshot( + "fastapi", "vuln.py", "ping", Cap::CODE_EXEC, 16, + EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()), + ); + } + + // ── django ────────────────────────────────────────────────────────────── + + #[test] + fn django_vuln_is_confirmed() { + if !python3_available() { eprintln!("SKIP: python3 not available"); return; } + if !python_module_available("django") { + eprintln!("SKIP: django not importable"); + return; + } + let r = run_shape_fixture( + "django", "vuln.py", "ping", Cap::CODE_EXEC, 15, + EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()), + ); + assert_confirmed("django", &r); + } + + #[test] + fn django_benign_not_confirmed() { + if !python3_available() { eprintln!("SKIP: python3 not available"); return; } + if !python_module_available("django") { + eprintln!("SKIP: django not importable"); + return; + } + let r = run_shape_fixture( + "django", "benign.py", "ping", Cap::CODE_EXEC, 14, + EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()), + ); + assert_not_confirmed("django", &r); + } + + #[test] + fn django_harness_snapshot_matches_golden() { + run_harness_snapshot( + "django", "vuln.py", "ping", Cap::CODE_EXEC, 15, + EntryKind::HttpRoute, PayloadSlot::QueryParam("host".into()), + ); + } + /// Sensitive-filename gate fires before any harness execution; no /// python3 needed. #[test] diff --git a/tests/spec_derivation_strategies.rs b/tests/spec_derivation_strategies.rs index 85961c65..5e27fa9e 100644 --- a/tests/spec_derivation_strategies.rs +++ b/tests/spec_derivation_strategies.rs @@ -320,14 +320,16 @@ mod spec_strategies { /// `Inconclusive(EntryKindUnsupported { lang, attempted, supported, hint })` /// rather than `Unsupported`. End-to-end coverage: /// - construct an HttpRoute spec via `derive_from_callgraph_entry` - /// (Python emitter currently advertises `[Function]` only); + /// against a language whose emitter still advertises `[Function]` + /// only (Rust, post Phase 12 — the Python emitter now supports + /// `HttpRoute` and would short-circuit the gate); /// - drive it through `verify_finding`; /// - assert the verdict shape matches the promise. #[test] fn entry_kind_gate_promotes_unsupported_to_inconclusive_with_hint() { let mut diag = make_diag( - "py.http.flask_route", - "tests/dynamic_fixtures/spec_strategies/callgraph_entry_http.py", + "rs.http.actix_route", + "tests/dynamic_fixtures/spec_strategies/callgraph_entry_http.rs", 8, ); let mut ev = Evidence::default(); @@ -357,7 +359,7 @@ mod spec_strategies { supported, hint, }) => { - assert_eq!(lang, nyx_scanner::symbol::Lang::Python); + assert_eq!(lang, nyx_scanner::symbol::Lang::Rust); assert!(matches!(attempted, EntryKind::HttpRoute)); assert!( !supported.is_empty(), @@ -365,7 +367,7 @@ mod spec_strategies { ); assert!( supported.contains(&EntryKind::Function), - "Python emitter must advertise Function support; got {supported:?}" + "Rust emitter must advertise Function support; got {supported:?}" ); assert!( !hint.is_empty(),