//! Phase 09 — Track D.1 + D.2 acceptance test. //! //! The fixture under `tests/dynamic_fixtures/env_capture/flask_three_deps/` //! pins a Flask app with three runtime deps (Flask, requests, Jinja2). //! This test exercises the full capture → stage → materialize pipeline //! and asserts: //! //! 1. [`capture_project_dependencies`] picks up every direct import //! plus the framework dep inferred from `requirements.txt`. //! 2. [`stage_workdir`] copies the entry + manifest + config files into //! a fresh workdir whose total byte size is under //! [`MAX_WORKDIR_BYTES`]. //! 3. The Python emitter's [`materialize_runtime`] synthesises a //! `requirements.txt` listing every captured dep. //! 4. When `python3` is available on the host, the staged workdir is //! importable end-to-end — the harness can `import app` and locate //! `run_command`. When Python is missing the import check is a //! no-op so the test still passes on bare CI runners (the Phase 09 //! acceptance "the verifier reaches the route handler" is satisfied //! structurally by step 3; full sandbox execution is exercised by //! the dynamic_verify_e2e suite, which builds on this staging). #![cfg(feature = "dynamic")] use nyx_scanner::dynamic::environment::{ MAX_WORKDIR_BYTES, capture_project_dependencies, capture_project_dependencies_with_context, stage_workdir_full, }; use nyx_scanner::dynamic::framework::FrameworkBinding; use nyx_scanner::dynamic::lang::materialize_runtime; use nyx_scanner::dynamic::spec::{EntryKind, HarnessSpec, PayloadSlot, SpecDerivationStrategy}; use nyx_scanner::labels::Cap; use nyx_scanner::symbol::Lang; use nyx_scanner::utils::project::DetectedFramework; use std::path::{Path, PathBuf}; use tempfile::TempDir; fn fixture_root() -> PathBuf { Path::new(env!("CARGO_MANIFEST_DIR")) .join("tests") .join("dynamic_fixtures") .join("env_capture") .join("flask_three_deps") } fn flask_spec(entry_rel: &str) -> HarnessSpec { HarnessSpec { finding_id: "0000000000000001".into(), entry_file: entry_rel.into(), entry_name: "run_command".into(), entry_kind: EntryKind::Function, lang: Lang::Python, toolchain_id: "python-3.11".into(), payload_slot: PayloadSlot::Param(0), expected_cap: Cap::CODE_EXEC, constraint_hints: vec![], sink_file: entry_rel.into(), sink_line: 18, spec_hash: "phase09testabcd1".into(), derivation: SpecDerivationStrategy::FromCallgraphEntry, stubs_required: vec![], framework: None, java_toolchain: nyx_scanner::dynamic::spec::JavaToolchain::default(), } } fn workdir_size(root: &Path) -> u64 { fn walk(p: &Path) -> u64 { let Ok(meta) = std::fs::metadata(p) else { return 0; }; if meta.is_file() { return meta.len(); } let mut sum = 0; let Ok(entries) = std::fs::read_dir(p) else { return 0; }; for e in entries.flatten() { sum += walk(&e.path()); } sum } walk(root) } #[test] fn capture_returns_three_deps_plus_flask() { let root = fixture_root(); let spec = flask_spec("app.py"); let captured = capture_project_dependencies(&root, &spec); // Direct deps from `app.py`: flask + requests + jinja2 + os (os is // stdlib and dropped at materialize time, but capture preserves it). let names: Vec = captured .direct_deps .iter() .map(|d| d.to_ascii_lowercase()) .collect(); assert!(names.contains(&"flask".to_owned()), "deps = {names:?}"); assert!(names.contains(&"requests".to_owned()), "deps = {names:?}"); assert!(names.contains(&"jinja2".to_owned()), "deps = {names:?}"); // Framework detector picks up Flask from `requirements.txt`. assert!(captured.frameworks.contains(&DetectedFramework::Flask)); // Toolchain pin from `pyproject.toml` (`requires-python = ">=3.11"`). assert_eq!(captured.toolchain.toolchain_id, "python-3.11"); assert!(!captured.toolchain.toolchain_drift); // Manifests resolved: requirements.txt and pyproject.toml. assert!( captured.lockfile.is_some(), "lockfile = {:?}", captured.lockfile ); let manifest_names: Vec = captured .manifests .iter() .filter_map(|p| p.file_name().and_then(|n| n.to_str()).map(String::from)) .collect(); assert!(manifest_names.contains(&"requirements.txt".to_owned())); assert!(manifest_names.contains(&"pyproject.toml".to_owned())); // Config files resolved. let config_names: Vec = captured .config_files .iter() .filter_map(|p| p.file_name().and_then(|n| n.to_str()).map(String::from)) .collect(); assert!(config_names.contains(&"config.yaml".to_owned())); } #[test] fn stage_workdir_emits_entry_manifest_and_config_under_budget() { let root = fixture_root(); let spec = flask_spec("app.py"); let captured = capture_project_dependencies(&root, &spec); let stage = TempDir::new().unwrap(); let env = stage_workdir_full(&captured, stage.path(), &spec.spec_hash, Lang::Python) .expect("stage workdir"); // Entry and manifests landed in the workdir. assert!(env.workdir.join("app.py").is_file()); assert!(env.workdir.join("requirements.txt").is_file()); assert!(env.workdir.join("pyproject.toml").is_file()); assert!(env.workdir.join("config.yaml").is_file()); // The captured workdir respects the 10 MiB bound. let bytes = workdir_size(&env.workdir); assert!( bytes <= MAX_WORKDIR_BYTES, "workdir size {bytes} exceeds budget {MAX_WORKDIR_BYTES}" ); // The original `requirements.txt` from the fixture is preserved // verbatim (capture step does not rewrite it). let staged_req = std::fs::read_to_string(env.workdir.join("requirements.txt")).unwrap(); assert!(staged_req.contains("Flask")); assert!(staged_req.contains("requests")); assert!(staged_req.contains("Jinja2")); } #[test] fn materialize_runtime_synthesises_pinned_manifest() { let root = fixture_root(); let spec = flask_spec("app.py"); let captured = capture_project_dependencies(&root, &spec); let stage = TempDir::new().unwrap(); let env = stage_workdir_full(&captured, stage.path(), &spec.spec_hash, Lang::Python) .expect("stage workdir"); let artifacts = materialize_runtime(&env); assert!( !artifacts.files.is_empty(), "python emitter must materialise a requirements.txt" ); let (rel, content) = artifacts .files .iter() .find(|(rel, _)| rel == "requirements.txt") .expect("requirements.txt artifact"); assert_eq!(rel, "requirements.txt"); let lower = content.to_ascii_lowercase(); assert!(lower.contains("flask")); assert!(lower.contains("requests")); assert!(lower.contains("jinja2")); // spec_hash baked into the header for forensic traceability. assert!(content.contains(&spec.spec_hash)); } fn adapter_bound_spec( lang: Lang, entry_file: &str, adapter: &str, entry_kind: EntryKind, ) -> HarnessSpec { HarnessSpec { finding_id: format!("adapter-{adapter}"), entry_file: entry_file.to_owned(), entry_name: "run".to_owned(), entry_kind: entry_kind.clone(), lang, toolchain_id: match lang { Lang::Python => "python-3.11", Lang::JavaScript | Lang::TypeScript => "node-20", Lang::Java => "java-21", Lang::Go => "go-1.21", Lang::Rust => "rust-stable", Lang::Php => "php-8.2", Lang::Ruby => "ruby-3.2", _ => "toolchain", } .to_owned(), payload_slot: PayloadSlot::Param(0), expected_cap: Cap::CODE_EXEC, constraint_hints: vec![], sink_file: entry_file.to_owned(), sink_line: 1, spec_hash: format!("hash-{adapter}"), derivation: SpecDerivationStrategy::FromFlowSteps, stubs_required: vec![], framework: Some(FrameworkBinding { adapter: adapter.to_owned(), kind: entry_kind, route: None, request_params: vec![], response_writer: None, middleware: vec![], }), java_toolchain: nyx_scanner::dynamic::spec::JavaToolchain::default(), } } #[test] fn materialize_runtime_adds_framework_adapter_deps_without_imports() { let root = TempDir::new().unwrap(); let cases = [ ( Lang::Python, "task.py", "scheduled-celery", EntryKind::ScheduledJob { schedule: Some("* * * * *".to_owned()), }, "requirements.txt", "celery", ), ( Lang::JavaScript, "resolver.js", "graphql-apollo", EntryKind::GraphQLResolver { type_name: "Query".to_owned(), field: "user".to_owned(), }, "package.json", "@apollo/server", ), ( Lang::Ruby, "worker.rb", "scheduled-sidekiq", EntryKind::ScheduledJob { schedule: None }, "Gemfile", "sidekiq", ), ( Lang::Php, "Middleware.php", "middleware-laravel", EntryKind::Middleware { name: "AuthMiddleware".to_owned(), }, "composer.json", "laravel/framework", ), ( Lang::Java, "QuartzJob.java", "scheduled-quartz", EntryKind::ScheduledJob { schedule: None }, "pom.xml", "org.quartz-scheduler", ), ( Lang::Go, "resolver.go", "graphql-gqlgen", EntryKind::GraphQLResolver { type_name: "Query".to_owned(), field: "user".to_owned(), }, "go.mod", "github.com/99designs/gqlgen", ), ( Lang::Rust, "resolver.rs", "graphql-juniper", EntryKind::GraphQLResolver { type_name: "Query".to_owned(), field: "user".to_owned(), }, "Cargo.toml", "juniper = \"0.16\"", ), ]; for (lang, entry_file, adapter, entry_kind, manifest, needle) in cases { std::fs::write(root.path().join(entry_file), "/* marker-only fixture */\n").unwrap(); let spec = adapter_bound_spec(lang, entry_file, adapter, entry_kind); let captured = capture_project_dependencies(root.path(), &spec); let stage = TempDir::new().unwrap(); let env = stage_workdir_full(&captured, stage.path(), &spec.spec_hash, lang) .expect("stage workdir"); let artifacts = materialize_runtime(&env); let (_, content) = artifacts .files .iter() .find(|(rel, _)| rel == manifest) .unwrap_or_else(|| panic!("{adapter} did not materialize {manifest}")); assert!( content.contains(needle), "{adapter} manifest {manifest} missing {needle}: {content}", ); } } #[test] fn workdir_is_importable_when_python_available() { // Acceptance bullet: "the route boots and the verifier reaches the // route handler". Done structurally — the staged workdir is set up // exactly the way the harness would consume it, and a smoke import // checks the entry module loads and exposes the route handler. // // The smoke check is gated on `python3` being installed because the // dynamic verifier itself is gated on the same precondition; bare // CI runners that lack python3 still pass the rest of the suite. let root = fixture_root(); let spec = flask_spec("app.py"); let captured = capture_project_dependencies(&root, &spec); let stage = TempDir::new().unwrap(); let _env = stage_workdir_full(&captured, stage.path(), &spec.spec_hash, Lang::Python) .expect("stage workdir"); // Skip end-to-end import when python3 is absent (matches the dynamic // verifier's behaviour: process backend on hosts without python3 // already reports `Unsupported(BackendUnavailable)`). let has_python3 = std::process::Command::new("python3") .arg("--version") .output() .map(|o| o.status.success()) .unwrap_or(false); if !has_python3 { eprintln!("python3 not on PATH — staging asserts done, end-to-end import skipped"); return; } // Skip if Flask isn't importable on the host. The build-sandbox would // normally pip-install it from `requirements.txt`, but we do not // exercise that path here (Phase 09 — Track D.1 is the capture + // stage pipeline, the pip-install is owned by `build_sandbox`). let has_flask = std::process::Command::new("python3") .args(["-c", "import flask"]) .output() .map(|o| o.status.success()) .unwrap_or(false); if !has_flask { eprintln!("flask not installed on host — staging asserts done, end-to-end import skipped"); return; } let output = std::process::Command::new("python3") .args([ "-c", "import sys; sys.path.insert(0, '.'); import app; assert callable(getattr(app, 'run_command', None)), 'run_command missing'; print('OK')", ]) .current_dir(stage.path()) .output() .expect("invoke python3"); let stdout = String::from_utf8_lossy(&output.stdout); let stderr = String::from_utf8_lossy(&output.stderr); assert!( output.status.success(), "python3 import failed: stdout={stdout} stderr={stderr}" ); assert!(stdout.contains("OK"), "missing OK marker: {stdout}"); } #[test] fn callgraph_context_extends_source_closure() { // Sanity check the Phase 09 closure path: when summaries + callgraph // are threaded in, the staged workdir contains every file the // reverse-edge walk discovered (here just one file because the // fixture is single-file). use nyx_scanner::ast::analyse_file_fused; use nyx_scanner::callgraph::build_call_graph; use nyx_scanner::summary::GlobalSummaries; use nyx_scanner::utils::config::{AnalysisMode, Config}; let mut cfg = Config::default(); cfg.scanner.mode = AnalysisMode::Full; cfg.scanner.read_vcsignore = false; cfg.scanner.require_git_to_read_vcsignore = false; cfg.performance.worker_threads = Some(1); let root = fixture_root(); let app = root.join("app.py"); let bytes = std::fs::read(&app).unwrap(); let result = analyse_file_fused(&bytes, &app, &cfg, None, Some(&root)).expect("analyse fixture"); let root_str = root.to_string_lossy(); let mut gs = GlobalSummaries::new(); for s in result.summaries { let key = s.func_key(Some(&root_str)); gs.insert(key, s); } for (key, ssa) in result.ssa_summaries { gs.insert_ssa(key, ssa); } let cg = build_call_graph(&gs, &[]); let spec = flask_spec("app.py"); let captured = capture_project_dependencies_with_context(&root, &spec, Some(&gs), Some(&cg)); assert!( captured .source_closure .iter() .any(|p| p.ends_with("app.py")), "source closure must include app.py: {:?}", captured.source_closure ); }