[pitboss] phase 04: Track A.4 — Callgraph-aware spec entry-point resolution

This commit is contained in:
pitboss 2026-05-14 04:20:26 -05:00
parent 3b660ba1d3
commit 780dc9099c
9 changed files with 618 additions and 4 deletions

View file

@ -52,6 +52,7 @@ pub struct AmbiguousCallee {
///
/// Nodes are [`FuncKey`]s (one per function definition across all files).
/// Edges represent call-site relationships resolved after pass 1.
#[derive(Debug)]
pub struct CallGraph {
pub graph: DiGraph<FuncKey, CallEdge>,
/// `FuncKey → NodeIndex` for quick lookup.

View file

@ -373,6 +373,22 @@ fn load_verify_summaries(
Some(Arc::new(crate::summary::merge_summaries(all, Some(&root_str))))
}
/// Build the whole-program [`crate::callgraph::CallGraph`] from a
/// preloaded [`crate::summary::GlobalSummaries`] so the verifier can
/// thread it into the callgraph-aware spec-derivation path
/// (`SpecDerivationStrategy::FromCallgraphEntry`).
///
/// Best-effort: callgraph construction itself never fails, but this
/// helper exists to keep the verify pipeline parallel with
/// [`load_verify_summaries`] and to absorb future failure modes (e.g.
/// interop-edge loading) behind a single optional return.
#[cfg(feature = "dynamic")]
fn load_verify_callgraph(
summaries: &crate::summary::GlobalSummaries,
) -> Arc<crate::callgraph::CallGraph> {
Arc::new(crate::callgraph::build_call_graph(summaries, &[]))
}
/// Entry point called by the CLI.
#[allow(clippy::too_many_arguments)]
pub fn handle(
@ -529,6 +545,12 @@ pub fn handle(
// without re-hitting SQLite per finding. Best-effort: a load
// failure logs and falls through to the substring heuristics.
opts.summaries = load_verify_summaries(&project_name, &db_path, &scan_path);
// Build the whole-program callgraph from the preloaded summaries
// so strategy 4 can walk reverse edges to a route handler / CLI
// entry when the sink lives in a leaf helper.
if let Some(ref s) = opts.summaries {
opts.callgraph = Some(load_verify_callgraph(s));
}
}
for diag in &mut diags {
let result = crate::dynamic::verify::verify_finding(diag, &opts);

View file

@ -17,13 +17,15 @@
//! meaning, the hash inputs change, or the corpus changes in a way that
//! would invalidate previously-computed hashes.
use crate::callgraph::{CallGraph, CallGraphAnalysis};
use crate::commands::scan::Diag;
use crate::dynamic::corpus::CORPUS_VERSION;
use crate::evidence::{Confidence, FlowStepKind, UnsupportedReason};
use crate::labels::Cap;
use crate::summary::{FuncSummary, GlobalSummaries};
use crate::symbol::Lang;
use crate::symbol::{FuncKey, Lang};
use serde::{Deserialize, Serialize};
use std::collections::{HashSet, VecDeque};
use std::path::Path;
/// Re-export of the always-present [`crate::evidence::SpecDerivationStrategy`].
@ -177,6 +179,33 @@ impl HarnessSpec {
diag: &Diag,
verify_all_confidence: bool,
summaries: Option<&GlobalSummaries>,
) -> Result<Self, UnsupportedReason> {
Self::from_finding_full(diag, verify_all_confidence, summaries, None)
}
/// Strategy-aware constructor that also consults a whole-program
/// [`CallGraph`] when `callgraph` is `Some`.
///
/// Strategy 4 ([`SpecDerivationStrategy::FromCallgraphEntry`]) walks
/// reverse call-graph edges from the sink's enclosing function via
/// [`crate::callgraph::callers_of`] to discover the *nearest* ancestor
/// that qualifies as an entry point (see [`is_entry_point`]). When
/// found, the spec's `entry_file` / `entry_name` are rewritten to the
/// ancestor and `entry_kind` is classified from the ancestor's
/// [`FuncSummary::entry_kind`] — capturing every framework-bound sink
/// whose only real caller is a route decorator or CLI subcommand.
///
/// When `callgraph` is `None` the behaviour matches
/// [`HarnessSpec::from_finding_with_summaries`] verbatim: strategy 4
/// falls back to the rule-id substring / summary-entry-kind path.
/// When `summaries` is `None` the callgraph walk has no per-key
/// summary to consult and degrades to a name-based entry recogniser
/// (`main` / `__main__`).
pub fn from_finding_full(
diag: &Diag,
verify_all_confidence: bool,
summaries: Option<&GlobalSummaries>,
callgraph: Option<&CallGraph>,
) -> Result<Self, UnsupportedReason> {
if !verify_all_confidence {
match diag.confidence {
@ -187,6 +216,18 @@ impl HarnessSpec {
let evidence = diag.evidence.as_ref().ok_or(UnsupportedReason::NoFlowSteps)?;
// Phase 04 pre-step: when both callgraph *and* summaries are
// present, walk reverse edges to a framework-bound ancestor.
// Takes precedence over the four-strategy ladder because a route
// handler / CLI entry is always a stronger driving anchor than
// the helper function that physically contains the sink.
if let (Some(s), Some(cg)) = (summaries, callgraph) {
if let Some(spec) = derive_from_callgraph_entry_full(diag, evidence, Some(s), Some(cg))
{
return Ok(spec);
}
}
// Try each strategy in priority order; first non-None wins.
if let Some(spec) = derive_from_flow_steps(diag, evidence) {
return Ok(spec);
@ -197,13 +238,35 @@ impl HarnessSpec {
if let Some(spec) = derive_from_func_summary_auto(diag, evidence, summaries) {
return Ok(spec);
}
if let Some(spec) = derive_from_callgraph_entry_with(diag, evidence, summaries) {
if let Some(spec) = derive_from_callgraph_entry_full(diag, evidence, summaries, callgraph)
{
return Ok(spec);
}
Err(UnsupportedReason::SpecDerivationFailed)
}
/// Convenience wrapper around [`HarnessSpec::from_finding_full`] that
/// pins `verify_all_confidence = false` and accepts only callgraph
/// context. Used by the verifier when the caller has built a fresh
/// [`CallGraph`] but not yet plumbed the matching
/// [`GlobalSummaries`]; in that mode the callgraph walk degrades to
/// the name-based entry recogniser.
///
/// The `analysis` argument is accepted to pin the API surface against
/// future SCC-aware refinements (e.g. bounding the reverse-edge BFS
/// against the analysis's pre-computed back edges); the current
/// implementation does not consult it because the BFS already
/// protects against recursive predecessor chains via its visited
/// set.
pub fn from_finding_with_callgraph(
diag: &Diag,
callgraph: &CallGraph,
_analysis: &CallGraphAnalysis,
) -> Result<Self, UnsupportedReason> {
Self::from_finding_full(diag, false, None, Some(callgraph))
}
/// True when [`HarnessSpec::entry_kind`] is in
/// [`crate::dynamic::lang::entry_kinds_supported`] for [`HarnessSpec::lang`].
///
@ -449,6 +512,26 @@ pub fn derive_from_callgraph_entry_with(
diag: &Diag,
evidence: &crate::evidence::Evidence,
summaries: Option<&GlobalSummaries>,
) -> Option<HarnessSpec> {
derive_from_callgraph_entry_full(diag, evidence, summaries, None)
}
/// Like [`derive_from_callgraph_entry_with`], but also consults the
/// whole-program [`CallGraph`] when `callgraph` is `Some`.
///
/// When both `summaries` and `callgraph` are present, the sink's
/// enclosing function is resolved to a [`FuncKey`] and a reverse-edge
/// BFS walks predecessors until an ancestor satisfies
/// [`is_entry_point`]. The spec's `entry_file` / `entry_name` are
/// rewritten to that ancestor and `entry_kind` is classified from the
/// ancestor's [`FuncSummary::entry_kind`] (HTTP variants → HttpRoute).
/// The legacy rule-id `.http.` / `.cli.` substring fallback is still
/// consulted when the callgraph walk finds nothing.
pub fn derive_from_callgraph_entry_full(
diag: &Diag,
evidence: &crate::evidence::Evidence,
summaries: Option<&GlobalSummaries>,
callgraph: Option<&CallGraph>,
) -> Option<HarnessSpec> {
let lang = lang_from_path(&diag.path)?;
let expected_cap = Cap::from_bits_truncate(evidence.sink_caps);
@ -456,7 +539,38 @@ pub fn derive_from_callgraph_entry_with(
return None;
}
// Step 1: try summary-based classification.
// Step 0: callgraph-aware reverse-edge walk to the nearest entry-point
// ancestor. Only fires when both summaries *and* callgraph are present.
if let (Some(s), Some(cg)) = (summaries, callgraph) {
if let Some(found) = find_entry_via_callgraph(diag, evidence, s, cg, lang) {
let entry_kind = found
.summary
.entry_kind
.as_ref()
.map(entry_kind_from_summary)
.unwrap_or_else(|| name_to_entry_kind(&found.summary.name));
let entry_file = if !found.summary.file_path.is_empty() {
found.summary.file_path.clone()
} else {
diag.path.clone()
};
let mut spec = finalize_spec(
diag,
entry_file,
found.summary.name.clone(),
lang,
expected_cap,
diag.path.clone(),
diag.line as u32,
SpecDerivationStrategy::FromCallgraphEntry,
);
spec.entry_kind = entry_kind;
spec.spec_hash = compute_spec_hash(&spec);
return Some(spec);
}
}
// Step 1: try summary-based classification of the enclosing function.
let summary_kind = enclosing_function_from_flow_steps(evidence)
.and_then(|name| find_summary_by_path(summaries?, lang, &name, &diag.path))
.and_then(|s| s.entry_kind.as_ref().map(entry_kind_from_summary));
@ -491,6 +605,140 @@ pub fn derive_from_callgraph_entry_with(
Some(spec)
}
/// Recognise function-name-only entry points when no static
/// [`crate::entry_points::EntryKind`] tag is available.
///
/// `main` / `fn main` / `__main__` (Python's `if __name__ == "__main__":`
/// block-as-function convention) become [`EntryKind::CliSubcommand`];
/// every other name defaults to [`EntryKind::Function`]. Used to give
/// the verifier a non-`Function` entry kind for callgraph-discovered
/// ancestors whose summaries pre-date the static entry-kind detector.
fn name_to_entry_kind(name: &str) -> EntryKind {
match name {
"main" | "__main__" => EntryKind::CliSubcommand,
_ => EntryKind::Function,
}
}
/// True when `func` qualifies as a static entry point: framework-bound
/// route handler (`func.entry_kind.is_some()`), Rust / C-style program
/// `main`, or Python `__main__` block-as-function.
///
/// `callgraph` is accepted as future-extension surface (e.g. checking
/// in-degree == 0 to claim externally-driven CLI helpers) but the
/// current implementation only uses it for the in-degree heuristic when
/// the function name itself does not match a recognised pattern.
pub fn is_entry_point(func: &FuncSummary, callgraph: &CallGraph) -> bool {
if func.entry_kind.is_some() {
return true;
}
if matches!(func.name.as_str(), "main" | "__main__") {
return true;
}
// Last-resort: if the call graph has zero static callers for this
// function and it is *not* a closure / lambda (which legitimately
// have zero callers but are inlined at their use site), treat it as
// externally driven. We only claim this when the function lives at
// file top level (empty container) so we do not promote leaf helper
// methods on classes to entry points.
if !func.container.is_empty() {
return false;
}
let lang = match Lang::from_slug(&func.lang) {
Some(l) => l,
None => return false,
};
let key = FuncKey {
lang,
namespace: func.file_path.clone(),
container: func.container.clone(),
name: func.name.clone(),
arity: Some(func.param_count),
disambig: func.disambig,
kind: func.kind,
};
if let Some(&node) = callgraph.index.get(&key) {
callgraph
.graph
.neighbors_directed(node, petgraph::Direction::Incoming)
.next()
.is_none()
} else {
false
}
}
/// Result of a successful callgraph-driven entry-point lookup.
struct EntryHit<'a> {
#[allow(dead_code)]
key: FuncKey,
summary: &'a FuncSummary,
}
/// Walk reverse edges from the sink's enclosing function until an entry
/// point is found.
///
/// Returns `None` when:
/// * the sink's enclosing function cannot be resolved from
/// `evidence.flow_steps`, or
/// * the resolved function has no node in the callgraph (e.g. defined
/// in a file pass 1 did not summarise), or
/// * no ancestor satisfies [`is_entry_point`] within the BFS frontier.
fn find_entry_via_callgraph<'a>(
diag: &Diag,
evidence: &crate::evidence::Evidence,
summaries: &'a GlobalSummaries,
callgraph: &CallGraph,
lang: Lang,
) -> Option<EntryHit<'a>> {
let enclosing = enclosing_function_from_flow_steps(evidence)
.or_else(|| resolve_enclosing_function(diag, evidence, Some(summaries), lang))?;
// Locate the FuncKey by matching name + file_path against the summaries.
let (sink_key, sink_summary) = summaries
.iter()
.find(|(k, s)| {
k.lang == lang && s.name == enclosing && paths_match(&s.file_path, &diag.path)
})
.map(|(k, s)| (k.clone(), s))?;
// Sink's own enclosing function may itself be an entry (route
// handler that contains the sink directly). When that is the case
// the existing summary-classification path already returns the
// right answer, but seeding the BFS with it keeps the two paths
// consistent.
let start = *callgraph.index.get(&sink_key)?;
if is_entry_point(sink_summary, callgraph) {
return Some(EntryHit {
key: sink_key,
summary: sink_summary,
});
}
let mut visited: HashSet<petgraph::graph::NodeIndex> = HashSet::new();
visited.insert(start);
let mut queue: VecDeque<petgraph::graph::NodeIndex> = VecDeque::new();
queue.push_back(start);
while let Some(node) = queue.pop_front() {
for caller_node in callgraph
.graph
.neighbors_directed(node, petgraph::Direction::Incoming)
{
if !visited.insert(caller_node) {
continue;
}
let caller_key = &callgraph.graph[caller_node];
if let Some(caller_summary) = summaries.get(caller_key) {
if is_entry_point(caller_summary, callgraph) {
return Some(EntryHit {
key: caller_key.clone(),
summary: caller_summary,
});
}
}
queue.push_back(caller_node);
}
}
None
}
/// Map a static-analysis [`crate::entry_points::EntryKind`] (route shape) onto
/// the dynamic-side [`EntryKind`] taxonomy. Every current variant of the
/// static enum describes an HTTP route handler — no CLI / library-API

View file

@ -3,6 +3,7 @@
//! The CLI subcommand and any library consumer call [`verify_finding`].
//! It is the only function the rest of the crate needs to know about.
use crate::callgraph::CallGraph;
use crate::commands::scan::Diag;
use crate::dynamic::corpus::{payloads_for, CORPUS_VERSION};
use crate::dynamic::report::{AttemptSummary, VerifyResult, VerifyStatus};
@ -41,6 +42,14 @@ pub struct VerifyOptions {
/// `None` disables the summary-driven derivation paths; strategy 3 is a
/// no-op and strategy 4 falls back to the rule-id substring heuristic.
pub summaries: Option<Arc<GlobalSummaries>>,
/// Whole-program [`CallGraph`] threaded into the callgraph-aware
/// branch of strategy 4 ([`SpecDerivationStrategy::FromCallgraphEntry`]).
///
/// When present alongside [`Self::summaries`], the verifier walks
/// reverse edges from the sink's enclosing function to the nearest
/// entry-point ancestor (route handler, CLI subcommand, `main`).
/// `None` keeps strategy 4 on the legacy rule-id substring path.
pub callgraph: Option<Arc<CallGraph>>,
}
impl VerifyOptions {
@ -61,6 +70,7 @@ impl VerifyOptions {
db_path: None,
verify_all_confidence: config.scanner.verify_all_confidence,
summaries: None,
callgraph: None,
}
}
}
@ -322,10 +332,11 @@ fn derivation_failure_hint(diag: &Diag) -> String {
pub fn verify_finding(diag: &Diag, opts: &VerifyOptions) -> VerifyResult {
let finding_id = format!("{:016x}", diag.stable_hash);
let spec = match HarnessSpec::from_finding_with_summaries(
let spec = match HarnessSpec::from_finding_full(
diag,
opts.verify_all_confidence,
opts.summaries.as_deref(),
opts.callgraph.as_deref(),
) {
Ok(s) => s,
Err(reason) => {

View file

@ -0,0 +1,28 @@
// Phase 04 fixture: Express route handler is a named function bound at
// `app.post`; it calls a helper that holds the sink. The callgraph-aware
// spec-derivation path must rewrite the harness entry to the route
// handler `runCommand`, not the helper `execHelper`.
//
// `runCommand` reads `req.body.cmd` into a local before dispatching to
// `execHelper`. Threading the local through gives the JS callee
// extractor a clean call shape (bare identifier in argument position)
// so the call-graph picks up the `runCommand → execHelper` edge.
const express = require("express");
const { exec } = require("child_process");
const app = express();
function execHelper(cmd) {
exec(cmd); // sink: command injection
}
function runCommand(req, res) {
const cmd = req.body.cmd;
execHelper(cmd);
res.send("ok");
}
app.post("/run", runCommand);
module.exports = app;

View file

@ -0,0 +1,21 @@
# Phase 04 fixture: sink in a helper function called only from a Flask
# route handler. The callgraph-aware spec-derivation path must rewrite
# the harness entry to the route handler `run_command` (entry-point
# ancestor with `entry_kind = FlaskRoute`), not the helper `_execute`
# where the sink physically lives.
from flask import Flask, request
app = Flask(__name__)
def _execute(cmd):
import os
os.system(cmd) # sink: command injection
@app.route("/run", methods=["POST"])
def run_command():
cmd = request.form.get("cmd", "")
_execute(cmd)
return "ok"

View file

@ -0,0 +1,23 @@
// Phase 04 fixture: Spring controller method calls a helper that holds
// the sink. The callgraph-aware spec-derivation path must rewrite the
// harness entry to the controller method `runCommand`, not the helper
// `execHelper`.
package fixture;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SinkController {
private void execHelper(String cmd) throws Exception {
Runtime.getRuntime().exec(cmd); // sink: command injection
}
@PostMapping("/run")
public String runCommand(@RequestBody String cmd) throws Exception {
execHelper(cmd);
return "ok";
}
}

View file

@ -106,6 +106,7 @@ mod parity_tests {
db_path: None,
verify_all_confidence: false,
summaries: None,
callgraph: None,
}
}
@ -120,6 +121,7 @@ mod parity_tests {
db_path: None,
verify_all_confidence: false,
summaries: None,
callgraph: None,
}
}

View file

@ -0,0 +1,258 @@
//! Phase 04 acceptance: callgraph-aware
//! [`SpecDerivationStrategy::FromCallgraphEntry`].
//!
//! Each fixture under `tests/dynamic_fixtures/callgraph_entry/` puts a
//! sink inside a leaf helper whose only static caller is a framework
//! entry point (Flask route, Express handler, Spring controller).
//! Without the callgraph walk, strategy 4 would name the helper itself
//! as the harness entry — the spec would then fail to build a runnable
//! harness because the helper is never externally invoked. With the
//! callgraph walk, the spec's `entry_name` rewrites to the framework
//! handler that wraps the helper, and `entry_kind` becomes
//! `EntryKind::HttpRoute`.
#![cfg(feature = "dynamic")]
use nyx_scanner::ast::analyse_file_fused;
use nyx_scanner::callgraph::{analyse, build_call_graph, CallGraph, CallGraphAnalysis};
use nyx_scanner::commands::scan::Diag;
use nyx_scanner::dynamic::spec::{
is_entry_point, EntryKind, HarnessSpec, SpecDerivationStrategy,
};
use nyx_scanner::evidence::{Confidence, Evidence, FlowStep, FlowStepKind};
use nyx_scanner::labels::Cap;
use nyx_scanner::patterns::{FindingCategory, Severity};
use nyx_scanner::summary::GlobalSummaries;
use nyx_scanner::utils::config::{AnalysisMode, Config};
use std::path::{Path, PathBuf};
fn fixtures_dir() -> PathBuf {
Path::new(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("dynamic_fixtures")
.join("callgraph_entry")
}
fn test_config() -> Config {
let mut cfg = Config::default();
cfg.scanner.mode = AnalysisMode::Full;
cfg.scanner.read_vcsignore = false;
cfg.scanner.require_git_to_read_vcsignore = false;
cfg.performance.worker_threads = Some(1);
cfg
}
/// Replay pass 1 on a single fixture file, returning the resulting
/// `GlobalSummaries` + whole-program `CallGraph` + `CallGraphAnalysis`.
fn build_context(file: &Path) -> (GlobalSummaries, CallGraph, CallGraphAnalysis) {
let cfg = test_config();
let root = file.parent().unwrap();
let root_str = root.to_string_lossy();
let bytes = std::fs::read(file).expect("read fixture");
let result = analyse_file_fused(&bytes, file, &cfg, None, Some(root))
.expect("analyse fixture");
let mut gs = GlobalSummaries::new();
for s in result.summaries {
let key = s.func_key(Some(&root_str));
gs.insert(key, s);
}
for (key, ssa) in result.ssa_summaries {
gs.insert_ssa(key, ssa);
}
let cg = build_call_graph(&gs, &[]);
let analysis = analyse(&cg);
(gs, cg, analysis)
}
fn make_diag(id: &str, path: &str, line: usize) -> Diag {
Diag {
path: path.into(),
line,
col: 0,
severity: Severity::High,
id: id.into(),
category: FindingCategory::Security,
path_validated: false,
guard_kind: None,
message: None,
labels: vec![],
confidence: Some(Confidence::High),
evidence: Some(Evidence::default()),
rank_score: None,
rank_reason: None,
suppressed: false,
suppression: None,
rollup: None,
finding_id: String::new(),
alternative_finding_ids: vec![],
stable_hash: 0,
}
}
fn sink_step_in(file: &str, function: &str, line: usize) -> FlowStep {
FlowStep {
step: 1,
kind: FlowStepKind::Sink,
file: file.into(),
line: line as u32,
col: 0,
snippet: None,
variable: None,
callee: None,
function: Some(function.into()),
is_cross_file: false,
}
}
/// Helper: assert that strategy 4 with the callgraph rewrites the
/// entry to a framework-bound ancestor.
fn assert_callgraph_rewrites_entry(
fixture: &str,
helper: &str,
expected_entry: &str,
sink_line: usize,
cap: Cap,
rule_id: &str,
) {
let file = fixtures_dir().join(fixture);
let file_str = file.to_string_lossy().to_string();
let (summaries, cg, analysis) = build_context(&file);
// Sanity: pass 1 saw both functions.
let names: Vec<String> = summaries.iter().map(|(_, s)| s.name.clone()).collect();
assert!(
names.iter().any(|n| n == helper),
"pass 1 must summarise helper `{helper}` in {fixture}; got {names:?}"
);
assert!(
names.iter().any(|n| n == expected_entry),
"pass 1 must summarise entry `{expected_entry}` in {fixture}; got {names:?}"
);
// Build a synthetic diag pointing at the helper.
let mut diag = make_diag(rule_id, &file_str, sink_line);
let mut ev = Evidence::default();
ev.flow_steps = vec![sink_step_in(&file_str, helper, sink_line)];
ev.sink_caps = cap.bits();
diag.evidence = Some(ev);
// Without callgraph: strategy 4 either bails or names the helper.
let baseline = HarnessSpec::from_finding_with_summaries(&diag, false, Some(&summaries));
if let Ok(ref s) = baseline {
assert_ne!(
s.entry_name, expected_entry,
"baseline (no callgraph) must not already rewrite the entry — \
otherwise the callgraph path is untested"
);
}
// With callgraph: entry is rewritten to the framework handler.
let spec = HarnessSpec::from_finding_full(&diag, false, Some(&summaries), Some(&cg))
.expect("callgraph-aware derivation must succeed");
assert_eq!(
spec.derivation,
SpecDerivationStrategy::FromCallgraphEntry,
"callgraph-walked spec must record FromCallgraphEntry"
);
assert_eq!(
spec.entry_name, expected_entry,
"callgraph walk must rewrite entry to the framework handler"
);
assert!(
matches!(spec.entry_kind, EntryKind::HttpRoute),
"callgraph walk must classify the entry as HttpRoute; got {:?}",
spec.entry_kind
);
assert_eq!(spec.expected_cap, cap);
let _ = analysis; // accepted but not asserted on here.
}
// ── Per-language fixtures ────────────────────────────────────────────────────
#[test]
fn flask_route_helper_sink_rewrites_to_route_handler() {
assert_callgraph_rewrites_entry(
"flask_route_sink.py",
"_execute",
"run_command",
13,
Cap::SHELL_ESCAPE,
"py.cmdi.os_system",
);
}
#[test]
fn express_handler_helper_sink_rewrites_to_route_handler() {
assert_callgraph_rewrites_entry(
"express_handler_sink.js",
"execHelper",
"runCommand",
17,
Cap::SHELL_ESCAPE,
"js.cmdi.exec",
);
}
#[test]
fn spring_controller_helper_sink_rewrites_to_controller_method() {
assert_callgraph_rewrites_entry(
"spring_controller_sink.java",
"execHelper",
"runCommand",
15,
Cap::SHELL_ESCAPE,
"java.cmdi.runtime_exec",
);
}
// ── `is_entry_point` direct coverage ─────────────────────────────────────────
#[test]
fn is_entry_point_recognises_route_decorator() {
let file = fixtures_dir().join("flask_route_sink.py");
let (summaries, cg, _analysis) = build_context(&file);
let handler = summaries
.iter()
.find(|(_, s)| s.name == "run_command")
.map(|(_, s)| s)
.expect("Flask route handler must be summarised");
assert!(
is_entry_point(handler, &cg),
"Flask-decorated function must qualify as an entry point"
);
let helper = summaries
.iter()
.find(|(_, s)| s.name == "_execute")
.map(|(_, s)| s)
.expect("helper must be summarised");
// The helper has a static caller and no entry_kind, so it must not
// be classified as an entry point.
assert!(
!is_entry_point(helper, &cg),
"helper with static caller and no entry_kind must not be an entry point"
);
}
#[test]
fn from_finding_with_callgraph_thin_wrapper_compiles_and_runs() {
// Smoke test for the literal-plan signature. Without summaries the
// wrapper degrades to the legacy substring path; this asserts the
// entry point is callable and returns a spec for a `.http.` rule.
let mut diag = make_diag(
"py.http.flask_route",
"tests/dynamic_fixtures/callgraph_entry/flask_route_sink.py",
15,
);
let mut ev = Evidence::default();
ev.sink_caps = Cap::SHELL_ESCAPE.bits();
diag.evidence = Some(ev);
let file = fixtures_dir().join("flask_route_sink.py");
let (_summaries, cg, analysis) = build_context(&file);
let spec = HarnessSpec::from_finding_with_callgraph(&diag, &cg, &analysis)
.expect("wrapper must derive a spec via the rule-id fallback");
assert_eq!(spec.derivation, SpecDerivationStrategy::FromCallgraphEntry);
assert!(matches!(spec.entry_kind, EntryKind::HttpRoute));
}