[pitboss] phase 01: Track A.1 — Spec derivation strategy enum + flow-steps-optional fallback

This commit is contained in:
pitboss 2026-05-13 13:03:44 -04:00
parent 5909fa8c5d
commit 31d9ef725a
21 changed files with 1106 additions and 62 deletions

View file

@ -1 +0,0 @@
{"sessionId":"3b3f9549-dbfc-4df7-8b4d-2b6393536381","pid":19723,"procStart":"Tue May 12 19:32:36 2026","acquiredAt":1778614799698}

View file

@ -191,6 +191,7 @@ mod tests {
sink_file: "main.c".into(),
sink_line: 5,
spec_hash: "0000000000000000".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
};
let err = build(&spec).unwrap_err();
assert!(matches!(err, HarnessError::Unsupported(_)));
@ -211,6 +212,7 @@ mod tests {
sink_file: "src/app.py".into(),
sink_line: 10,
spec_hash: "test0000abcd1234".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
};
let harness = build(&spec).unwrap();
assert!(harness.workdir.join("harness.py").exists());

View file

@ -150,6 +150,7 @@ mod tests {
sink_file: "cmd/server/main.go".into(),
sink_line: 20,
spec_hash: "go0000000000001".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
}
}

View file

@ -138,6 +138,7 @@ mod tests {
sink_file: "src/main/java/App.java".into(),
sink_line: 25,
spec_hash: "java00000000001".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
}
}

View file

@ -181,6 +181,7 @@ mod tests {
sink_file: "src/app.js".into(),
sink_line: 15,
spec_hash: "js000000000001".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
}
}

View file

@ -141,6 +141,7 @@ mod tests {
sink_file: "src/login.php".into(),
sink_line: 10,
spec_hash: "php0000000000001".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
}
}

View file

@ -201,6 +201,7 @@ mod tests {
sink_file: "src/app.py".into(),
sink_line: 15,
spec_hash: "00000000deadbeef".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
}
}

View file

@ -179,6 +179,7 @@ mod tests {
sink_file: "src/handler.rs".into(),
sink_line: 10,
spec_hash: "rusttest00000001".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
}
}

View file

@ -28,6 +28,42 @@
//!
//! Off by default. Enable with `--features dynamic`. Heavy deps (container
//! runtime client, fuzzer harness) live behind the same gate.
//!
//! # Spec derivation strategies
//!
//! [`spec::HarnessSpec::from_finding_opts`] tries a fixed-order pipeline of
//! [`spec::SpecDerivationStrategy`] candidates and returns the first one that
//! produces a runnable spec. Ordering is deliberately chosen so the cheapest,
//! most-precise sources fire first:
//!
//! 1. [`SpecDerivationStrategy::FromFlowSteps`] — the original derivation
//! path. Walks `evidence.flow_steps` for the outermost `Source` and uses
//! its enclosing function as the entry. Fires for taint findings with a
//! real cross-function flow.
//! 2. [`SpecDerivationStrategy::FromRuleNamespace`] — consumes the diag's
//! rule id (`py.cmdi.os_system`, `java.deser.readobject`,
//! `rs.auth.missing_ownership_check.taint`) plus `evidence.sink_caps` to
//! synthesize a single-step flow. Fires for AST/CFG findings whose rule
//! namespace identifies the sink class.
//! 3. [`SpecDerivationStrategy::FromFuncSummaryWalk`] — walks a
//! [`crate::summary::FuncSummary`] for the sink's enclosing function and
//! picks a `tainted_sink_params` entry. Currently only fires when a
//! summary is threaded in by the caller; the default verifier path does
//! not.
//! 4. [`SpecDerivationStrategy::FromCallgraphEntry`] — last-chance heuristic
//! that treats `*.http.*` and `*.cli.*` rule ids as entry-point findings.
//!
//! When every strategy returns `None`, [`verify::verify_finding`] decides
//! whether to lift the failure to
//! [`crate::evidence::InconclusiveReason::SpecDerivationFailed`] (the finding
//! had derivable signal but no strategy matched) or to keep it as
//! [`crate::evidence::UnsupportedReason::SpecDerivationFailed`] (genuinely
//! unmodellable).
//!
//! [`SpecDerivationStrategy::FromFlowSteps`]: spec::SpecDerivationStrategy::FromFlowSteps
//! [`SpecDerivationStrategy::FromRuleNamespace`]: spec::SpecDerivationStrategy::FromRuleNamespace
//! [`SpecDerivationStrategy::FromFuncSummaryWalk`]: spec::SpecDerivationStrategy::FromFuncSummaryWalk
//! [`SpecDerivationStrategy::FromCallgraphEntry`]: spec::SpecDerivationStrategy::FromCallgraphEntry
pub mod build_sandbox;
pub mod corpus;

View file

@ -392,6 +392,7 @@ mod tests {
sink_file: "app.py".into(),
sink_line: 10,
spec_hash: "cafecafecafe0001".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
}
}

View file

@ -19,12 +19,20 @@
use crate::commands::scan::Diag;
use crate::dynamic::corpus::CORPUS_VERSION;
use crate::evidence::{Confidence, FlowStepKind, UnsupportedReason};
use crate::evidence::{Confidence, FlowStep, FlowStepKind, UnsupportedReason};
use crate::labels::Cap;
use crate::summary::FuncSummary;
use crate::symbol::Lang;
use serde::{Deserialize, Serialize};
use std::path::Path;
/// Re-export of the always-present [`crate::evidence::SpecDerivationStrategy`].
///
/// The canonical definition lives in `evidence.rs` so that
/// [`crate::evidence::InconclusiveReason::SpecDerivationFailed`] can carry a
/// `Vec` of attempted strategies without depending on the `dynamic` feature.
pub use crate::evidence::SpecDerivationStrategy;
/// Bump whenever [`HarnessSpec`] fields change meaning or the spec hash
/// inputs change. Downstream tools should reject specs with an unrecognised
/// version.
@ -101,6 +109,15 @@ pub struct HarnessSpec {
/// Blake3 hash (16 hex chars) of the spec's key fields, version-pinned.
/// Stable across identical specs; used for deduplication and caching.
pub spec_hash: String,
/// Which derivation strategy produced this spec. Populated by
/// [`HarnessSpec::from_finding_opts`]; default for backward compatibility
/// with deserialised specs that pre-date the typed strategy.
#[serde(default = "default_derivation_strategy")]
pub derivation: SpecDerivationStrategy,
}
fn default_derivation_strategy() -> SpecDerivationStrategy {
SpecDerivationStrategy::FromFlowSteps
}
impl HarnessSpec {
@ -120,11 +137,27 @@ impl HarnessSpec {
/// Like `from_finding`, but with `verify_all_confidence=true` the
/// `Confidence >= Medium` gate is skipped so low-confidence findings
/// are also attempted.
///
/// Returns `Err(UnsupportedReason::ConfidenceTooLow)` immediately when
/// the confidence gate fails. Otherwise tries each
/// [`SpecDerivationStrategy`] in order:
/// [`SpecDerivationStrategy::FromFlowSteps`],
/// [`SpecDerivationStrategy::FromRuleNamespace`],
/// [`SpecDerivationStrategy::FromFuncSummaryWalk`],
/// [`SpecDerivationStrategy::FromCallgraphEntry`]. The first non-error
/// strategy wins and its tag is stored on `spec.derivation`.
///
/// Returns `Err(UnsupportedReason::NoFlowSteps)` only when no evidence is
/// present at all. When evidence exists but every strategy fails, the
/// caller is expected to surface the failure as
/// [`crate::evidence::InconclusiveReason::SpecDerivationFailed`] —
/// this method returns `Err(UnsupportedReason::SpecDerivationFailed)`
/// in that case, and `verify_finding` decides whether to lift it to
/// `Inconclusive` based on whether any strategy was actually tried.
pub fn from_finding_opts(
diag: &Diag,
verify_all_confidence: bool,
) -> Result<Self, UnsupportedReason> {
// Require at least Medium confidence unless caller opts out.
if !verify_all_confidence {
match diag.confidence {
Some(c) if c >= Confidence::Medium => {}
@ -134,53 +167,355 @@ impl HarnessSpec {
let evidence = diag.evidence.as_ref().ok_or(UnsupportedReason::NoFlowSteps)?;
if evidence.flow_steps.is_empty() {
return Err(UnsupportedReason::NoFlowSteps);
// Try each strategy in priority order; first non-None wins.
if let Some(spec) = derive_from_flow_steps(diag, evidence) {
return Ok(spec);
}
if let Some(spec) = derive_from_rule_namespace(diag, evidence) {
return Ok(spec);
}
if let Some(spec) = derive_from_func_summary(diag, evidence, None) {
return Ok(spec);
}
if let Some(spec) = derive_from_callgraph_entry(diag, evidence) {
return Ok(spec);
}
let entry = outermost_entry(&evidence.flow_steps)
.ok_or(UnsupportedReason::SpecDerivationFailed)?;
let ext = Path::new(&entry.file)
.extension()
.and_then(|e| e.to_str())
.unwrap_or("");
let lang = Lang::from_extension(ext).ok_or(UnsupportedReason::SpecDerivationFailed)?;
let expected_cap = Cap::from_bits_truncate(evidence.sink_caps);
if expected_cap.is_empty() {
return Err(UnsupportedReason::SpecDerivationFailed);
}
let toolchain_id = toolchain_id_for_lang(lang).to_owned();
// Sink location: prefer explicit sink step; fall back to diag location.
let (sink_file, sink_line) = evidence
.flow_steps
.iter()
.rev()
.find(|s| matches!(s.kind, FlowStepKind::Sink))
.map(|s| (s.file.clone(), s.line))
.unwrap_or_else(|| (diag.path.clone(), diag.line as u32));
let mut spec = HarnessSpec {
finding_id: format!("{:016x}", diag.stable_hash),
entry_file: entry.file,
entry_name: entry.function,
entry_kind: EntryKind::Function,
lang,
toolchain_id,
payload_slot: PayloadSlot::Param(0),
expected_cap,
constraint_hints: vec![],
sink_file,
sink_line,
spec_hash: String::new(),
};
spec.spec_hash = compute_spec_hash(&spec);
Ok(spec)
Err(UnsupportedReason::SpecDerivationFailed)
}
/// Returns the ordered list of derivation strategies that
/// [`HarnessSpec::from_finding_opts`] attempts. Used by the verifier when
/// it needs to report which candidates were tried before declaring an
/// `Inconclusive(SpecDerivationFailed)` verdict.
pub fn derivation_strategies() -> &'static [SpecDerivationStrategy] {
&[
SpecDerivationStrategy::FromFlowSteps,
SpecDerivationStrategy::FromRuleNamespace,
SpecDerivationStrategy::FromFuncSummaryWalk,
SpecDerivationStrategy::FromCallgraphEntry,
]
}
}
// ── Strategy 1: from flow_steps (original path) ──────────────────────────────
fn derive_from_flow_steps(diag: &Diag, evidence: &crate::evidence::Evidence) -> Option<HarnessSpec> {
if evidence.flow_steps.is_empty() {
return None;
}
let entry = outermost_entry(&evidence.flow_steps)?;
let lang = lang_from_path(&entry.file)?;
let expected_cap = Cap::from_bits_truncate(evidence.sink_caps);
if expected_cap.is_empty() {
return None;
}
let (sink_file, sink_line) = evidence
.flow_steps
.iter()
.rev()
.find(|s| matches!(s.kind, FlowStepKind::Sink))
.map(|s| (s.file.clone(), s.line))
.unwrap_or_else(|| (diag.path.clone(), diag.line as u32));
Some(finalize_spec(
diag,
entry.file,
entry.function,
lang,
expected_cap,
sink_file,
sink_line,
SpecDerivationStrategy::FromFlowSteps,
))
}
// ── Strategy 2: from rule namespace + sink evidence ──────────────────────────
/// Build a spec from a rule-namespace finding (e.g. `py.cmdi.os_system`,
/// `java.deser.readobject`, `rs.auth.missing_ownership_check.taint`) plus the
/// finding's sink evidence. The diag's path and line locate the sink call
/// site; the rule namespace's first segment selects the language, and the
/// second segment maps to a [`Cap`] via [`cap_for_rule_category`].
///
/// A synthetic single-step `Source` flow is constructed at the diag location
/// so downstream consumers that walk `evidence.flow_steps` keep working. The
/// entry function defaults to the sink-enclosing function from the diag's
/// evidence when available, otherwise to `"<unknown>"` (which keeps spec
/// hashing stable while signalling the lack of a concrete entry).
pub fn derive_from_rule_namespace(
diag: &Diag,
evidence: &crate::evidence::Evidence,
) -> Option<HarnessSpec> {
let mut iter = diag.id.split('.');
let lang_prefix = iter.next()?;
let category = iter.next()?;
let lang = lang_from_rule_prefix(lang_prefix)?;
// The category token must map to a known [`Cap`]; if not, defer to the
// callgraph-entry strategy or fall through to `SpecDerivationFailed`.
let category_cap = cap_for_rule_category(category)?;
// Sink caps: prefer explicit evidence; fall back to the category map.
let expected_cap = {
let from_ev = Cap::from_bits_truncate(evidence.sink_caps);
if !from_ev.is_empty() {
from_ev
} else {
category_cap
}
};
if expected_cap.is_empty() {
return None;
}
// Path is required to locate the sink and to extension-check the lang.
if diag.path.is_empty() {
return None;
}
// Cross-check: the diag's file extension must agree with the rule's
// language prefix when both are available. Disagreement is a stronger
// signal of a mis-rooted finding than a missing extension.
if let Some(path_lang) = lang_from_path(&diag.path) {
if path_lang != lang {
return None;
}
}
let entry_function = evidence
.sink
.as_ref()
.and_then(|s| s.snippet.clone())
.filter(|s| !s.is_empty())
.unwrap_or_else(|| "<unknown>".to_owned());
Some(finalize_spec(
diag,
diag.path.clone(),
entry_function,
lang,
expected_cap,
diag.path.clone(),
diag.line as u32,
SpecDerivationStrategy::FromRuleNamespace,
))
}
// ── Strategy 3: walk a FuncSummary for the sink's enclosing function ─────────
/// Build a spec by walking `summary` (the sink's enclosing function) for any
/// param-to-sink edge. When `summary` is `None` (the common case at verify
/// time, where global summaries are not threaded in), this returns `None`.
///
/// Picks the first `tainted_sink_params` entry as `PayloadSlot::Param(idx)`.
/// The synthetic flow has one source step pinned at the summary's parameter
/// and one sink step at the diag's line.
pub fn derive_from_func_summary(
diag: &Diag,
evidence: &crate::evidence::Evidence,
summary: Option<&FuncSummary>,
) -> Option<HarnessSpec> {
let summary = summary?;
let param_idx = *summary.tainted_sink_params.first()?;
let lang = Lang::from_slug(&summary.lang)?;
let expected_cap = {
let from_ev = Cap::from_bits_truncate(evidence.sink_caps);
if !from_ev.is_empty() {
from_ev
} else {
Cap::from_bits_truncate(summary.sink_caps)
}
};
if expected_cap.is_empty() {
return None;
}
let entry_file = if !summary.file_path.is_empty() {
summary.file_path.clone()
} else {
diag.path.clone()
};
let entry_name = summary.name.clone();
let mut spec = finalize_spec(
diag,
entry_file,
entry_name,
lang,
expected_cap,
diag.path.clone(),
diag.line as u32,
SpecDerivationStrategy::FromFuncSummaryWalk,
);
spec.payload_slot = PayloadSlot::Param(param_idx);
spec.spec_hash = compute_spec_hash(&spec);
Some(spec)
}
// ── Strategy 4: callgraph entry-kind ─────────────────────────────────────────
/// Build a spec by treating the sink's enclosing function as an entry point
/// when its rule namespace marks it as an externally-driven entry (HTTP route,
/// CLI subcommand). Currently fires when the rule id contains `.http.` or
/// `.cli.`; otherwise returns `None`.
///
/// Without a threaded [`crate::callgraph::CallGraph`] this strategy is a
/// minimal heuristic; it remains as the last-chance resort so the verifier
/// has something to drive against rather than declaring unsupported.
pub fn derive_from_callgraph_entry(
diag: &Diag,
evidence: &crate::evidence::Evidence,
) -> Option<HarnessSpec> {
let id = &diag.id;
let entry_kind = if id.contains(".http.") {
EntryKind::HttpRoute
} else if id.contains(".cli.") {
EntryKind::CliSubcommand
} else {
return None;
};
let lang = lang_from_path(&diag.path)?;
let expected_cap = Cap::from_bits_truncate(evidence.sink_caps);
if expected_cap.is_empty() {
return None;
}
let entry_function = evidence
.source
.as_ref()
.and_then(|s| s.snippet.clone())
.filter(|s| !s.is_empty())
.unwrap_or_else(|| "<unknown>".to_owned());
let mut spec = finalize_spec(
diag,
diag.path.clone(),
entry_function,
lang,
expected_cap,
diag.path.clone(),
diag.line as u32,
SpecDerivationStrategy::FromCallgraphEntry,
);
spec.entry_kind = entry_kind;
spec.spec_hash = compute_spec_hash(&spec);
Some(spec)
}
// ── Helpers ──────────────────────────────────────────────────────────────────
fn lang_from_path(path: &str) -> Option<Lang> {
let ext = Path::new(path).extension().and_then(|e| e.to_str()).unwrap_or("");
Lang::from_extension(ext)
}
/// Map the first segment of a Nyx rule id (`py`, `js`, `ts`, `java`, …) to a
/// [`Lang`]. Returns `None` for non-language prefixes (`taint-`, `cfg-`,
/// `state-`).
fn lang_from_rule_prefix(prefix: &str) -> Option<Lang> {
match prefix {
"rs" | "rust" => Some(Lang::Rust),
"py" | "python" => Some(Lang::Python),
"js" | "javascript" => Some(Lang::JavaScript),
"ts" | "typescript" => Some(Lang::TypeScript),
"java" => Some(Lang::Java),
"go" => Some(Lang::Go),
"php" => Some(Lang::Php),
"rb" | "ruby" => Some(Lang::Ruby),
"c" => Some(Lang::C),
"cpp" => Some(Lang::Cpp),
_ => None,
}
}
/// Map the second segment of a Nyx rule id (e.g. `cmdi`, `xss`, `sqli`,
/// `deser`, `ssrf`, `path`, `auth`) to a [`Cap`].
fn cap_for_rule_category(category: &str) -> Option<Cap> {
match category {
"cmdi" | "command" => Some(Cap::SHELL_ESCAPE),
"xss" => Some(Cap::HTML_ESCAPE),
"sqli" | "sql" => Some(Cap::SQL_QUERY),
"code_exec" | "eval" => Some(Cap::CODE_EXEC),
"ssrf" => Some(Cap::SSRF),
"path" | "traversal" => Some(Cap::FILE_IO),
"deser" | "deserialize" => Some(Cap::DESERIALIZE),
"auth" => Some(Cap::UNAUTHORIZED_ID),
"format" | "fmtstr" => Some(Cap::FMT_STRING),
"ldap" => Some(Cap::LDAP_INJECTION),
"xpath" => Some(Cap::XPATH_INJECTION),
"header" => Some(Cap::HEADER_INJECTION),
"redirect" => Some(Cap::OPEN_REDIRECT),
"ssti" | "template" => Some(Cap::SSTI),
"xxe" => Some(Cap::XXE),
"proto" | "prototype" => Some(Cap::PROTOTYPE_POLLUTION),
_ => None,
}
}
#[allow(clippy::too_many_arguments)]
fn finalize_spec(
diag: &Diag,
entry_file: String,
entry_name: String,
lang: Lang,
expected_cap: Cap,
sink_file: String,
sink_line: u32,
derivation: SpecDerivationStrategy,
) -> HarnessSpec {
let toolchain_id = toolchain_id_for_lang(lang).to_owned();
let mut spec = HarnessSpec {
finding_id: format!("{:016x}", diag.stable_hash),
entry_file,
entry_name,
entry_kind: EntryKind::Function,
lang,
toolchain_id,
payload_slot: PayloadSlot::Param(0),
expected_cap,
constraint_hints: vec![],
sink_file,
sink_line,
spec_hash: String::new(),
derivation,
};
spec.spec_hash = compute_spec_hash(&spec);
spec
}
/// Walk a synthetic single-step flow to satisfy callers that expect a `FlowStep`
/// vector. Used by strategies 24 when they need to materialise a flow for
/// downstream consumers.
#[allow(dead_code)]
pub(crate) fn synthetic_flow(diag: &Diag, function: &str) -> Vec<FlowStep> {
vec![
FlowStep {
step: 1,
kind: FlowStepKind::Source,
file: diag.path.clone(),
line: diag.line as u32,
col: diag.col as u32,
snippet: None,
variable: None,
callee: None,
function: Some(function.to_owned()),
is_cross_file: false,
},
FlowStep {
step: 2,
kind: FlowStepKind::Sink,
file: diag.path.clone(),
line: diag.line as u32,
col: diag.col as u32,
snippet: None,
variable: None,
callee: None,
function: Some(function.to_owned()),
is_cross_file: false,
},
]
}
/// Walk `flow_steps` and return the entry point: the enclosing function of
@ -352,12 +687,32 @@ mod tests {
}
#[test]
fn from_finding_err_no_flow_steps() {
fn from_finding_err_no_flow_steps_falls_through_to_spec_derivation_failed() {
// PrePhase 01, this returned `NoFlowSteps` directly. After the
// typed-strategy rewrite, the verifier still tries the rule-namespace
// and func-summary strategies; only when *every* strategy fails does
// it surface `SpecDerivationFailed`. Empty evidence + empty rule
// id leaves nothing for any strategy to chew on.
let diag = crate::commands::scan::Diag {
confidence: Some(Confidence::Medium),
evidence: Some(Evidence::default()),
..Default::default()
};
assert_eq!(
HarnessSpec::from_finding(&diag).unwrap_err(),
UnsupportedReason::SpecDerivationFailed
);
}
#[test]
fn from_finding_err_no_evidence_returns_no_flow_steps() {
// When the finding carries no Evidence struct at all, there is no
// signal for any strategy. Reported as `NoFlowSteps`.
let diag = crate::commands::scan::Diag {
confidence: Some(Confidence::Medium),
evidence: None,
..Default::default()
};
assert_eq!(
HarnessSpec::from_finding(&diag).unwrap_err(),
UnsupportedReason::NoFlowSteps
@ -423,6 +778,7 @@ mod tests {
sink_file: "src/handler.rs".into(),
sink_line: 10,
spec_hash: String::new(),
derivation: SpecDerivationStrategy::FromFlowSteps,
};
spec.spec_hash = compute_spec_hash(&spec);
spec
@ -492,4 +848,195 @@ mod tests {
s2.spec_hash = compute_spec_hash(&s2);
assert_ne!(s1.spec_hash, s2.spec_hash, "toolchain_id mutation must change spec_hash");
}
// ── Phase 01: derivation strategies ──────────────────────────────────────
fn diag_with_rule_id(id: &str, path: &str, sink_caps: u32) -> crate::commands::scan::Diag {
crate::commands::scan::Diag {
id: id.into(),
path: path.into(),
line: 12,
col: 4,
confidence: Some(Confidence::Medium),
evidence: Some(Evidence {
sink_caps,
..Default::default()
}),
..Default::default()
}
}
#[test]
fn derivation_strategies_returns_ordered_list() {
let strategies = HarnessSpec::derivation_strategies();
assert_eq!(strategies.len(), 4);
assert_eq!(strategies[0], SpecDerivationStrategy::FromFlowSteps);
assert_eq!(strategies[1], SpecDerivationStrategy::FromRuleNamespace);
assert_eq!(strategies[2], SpecDerivationStrategy::FromFuncSummaryWalk);
assert_eq!(strategies[3], SpecDerivationStrategy::FromCallgraphEntry);
}
#[test]
fn flow_steps_strategy_records_derivation_tag() {
use crate::labels::Cap;
let evidence = Evidence {
flow_steps: vec![
source_step("src/handler.py", "handle_request"),
sink_step("src/handler.py"),
],
sink_caps: Cap::SHELL_ESCAPE.bits(),
..Default::default()
};
let diag = crate::commands::scan::Diag {
confidence: Some(Confidence::High),
evidence: Some(evidence),
path: "src/handler.py".into(),
..Default::default()
};
let spec = HarnessSpec::from_finding(&diag).unwrap();
assert_eq!(spec.derivation, SpecDerivationStrategy::FromFlowSteps);
assert_eq!(spec.entry_name, "handle_request");
}
#[test]
fn rule_namespace_strategy_fires_without_flow_steps() {
use crate::labels::Cap;
let diag = diag_with_rule_id("py.cmdi.os_system", "app/handler.py", Cap::SHELL_ESCAPE.bits());
let spec = HarnessSpec::from_finding(&diag).unwrap();
assert_eq!(spec.derivation, SpecDerivationStrategy::FromRuleNamespace);
assert_eq!(spec.lang, Lang::Python);
assert_eq!(spec.expected_cap, Cap::SHELL_ESCAPE);
assert_eq!(spec.entry_file, "app/handler.py");
assert_eq!(spec.sink_line, 12);
}
#[test]
fn rule_namespace_strategy_picks_cap_from_category_when_sink_caps_zero() {
let diag = diag_with_rule_id("java.deser.readobject", "src/Main.java", 0);
let spec = HarnessSpec::from_finding(&diag).unwrap();
assert_eq!(spec.derivation, SpecDerivationStrategy::FromRuleNamespace);
assert_eq!(spec.lang, Lang::Java);
assert_eq!(spec.expected_cap, Cap::DESERIALIZE);
}
#[test]
fn rule_namespace_strategy_rejects_path_lang_mismatch() {
use crate::labels::Cap;
// `py.*` rule id, but a `.java` file — the cross-check refuses.
let diag = diag_with_rule_id("py.cmdi.os_system", "src/Main.java", Cap::SHELL_ESCAPE.bits());
assert_eq!(
HarnessSpec::from_finding(&diag).unwrap_err(),
UnsupportedReason::SpecDerivationFailed
);
}
#[test]
fn rule_namespace_strategy_rejects_unknown_category() {
// Cap evidence zero AND category unknown → no fallback cap available.
let diag = diag_with_rule_id("py.weirdcategory.unknown", "app/handler.py", 0);
assert_eq!(
HarnessSpec::from_finding(&diag).unwrap_err(),
UnsupportedReason::SpecDerivationFailed
);
}
#[test]
fn rule_namespace_strategy_skips_legacy_taint_ids() {
use crate::labels::Cap;
// `taint-...` is *not* a language-namespace prefix; rule-namespace
// strategy must skip it so the next strategy can try.
let diag = diag_with_rule_id("taint-unsanitised-flow", "app/handler.py", Cap::SHELL_ESCAPE.bits());
// No flow_steps, no http/cli marker → ends in SpecDerivationFailed.
assert_eq!(
HarnessSpec::from_finding(&diag).unwrap_err(),
UnsupportedReason::SpecDerivationFailed
);
}
#[test]
fn func_summary_strategy_picks_first_tainted_param() {
use crate::labels::Cap;
let evidence = Evidence::default();
let diag = crate::commands::scan::Diag {
confidence: Some(Confidence::Medium),
evidence: Some(evidence.clone()),
path: "src/lib.rs".into(),
line: 7,
..Default::default()
};
let summary = FuncSummary {
name: "open_path".into(),
file_path: "src/lib.rs".into(),
lang: "rust".into(),
param_count: 2,
param_names: vec!["root".into(), "name".into()],
source_caps: 0,
sanitizer_caps: 0,
sink_caps: Cap::FILE_IO.bits(),
propagating_params: vec![],
propagates_taint: false,
tainted_sink_params: vec![1],
param_to_sink: vec![],
callees: vec![],
container: String::new(),
disambig: None,
kind: Default::default(),
module_path: None,
rust_use_map: None,
rust_wildcards: None,
hierarchy_edges: vec![],
entry_kind: None,
};
let spec = derive_from_func_summary(&diag, &evidence, Some(&summary))
.expect("summary strategy must fire");
assert_eq!(spec.derivation, SpecDerivationStrategy::FromFuncSummaryWalk);
assert!(matches!(spec.payload_slot, PayloadSlot::Param(1)));
assert_eq!(spec.entry_name, "open_path");
assert_eq!(spec.expected_cap, Cap::FILE_IO);
}
#[test]
fn callgraph_entry_strategy_fires_on_http_rule_id() {
use crate::labels::Cap;
// `http` is not in `cap_for_rule_category`, so rule-namespace bails.
// The id contains `.http.`, so callgraph-entry catches it.
let diag = diag_with_rule_id("py.http.flask_route", "app/views.py", Cap::SSRF.bits());
let spec = HarnessSpec::from_finding(&diag).unwrap();
assert_eq!(spec.derivation, SpecDerivationStrategy::FromCallgraphEntry);
assert!(matches!(spec.entry_kind, EntryKind::HttpRoute));
assert_eq!(spec.lang, Lang::Python);
}
#[test]
fn callgraph_entry_strategy_fires_on_cli_rule_id() {
use crate::labels::Cap;
let diag = diag_with_rule_id("rs.cli.parse_subcommand", "src/main.rs", Cap::SHELL_ESCAPE.bits());
let spec = HarnessSpec::from_finding(&diag).unwrap();
assert_eq!(spec.derivation, SpecDerivationStrategy::FromCallgraphEntry);
assert!(matches!(spec.entry_kind, EntryKind::CliSubcommand));
}
#[test]
fn strategy_priority_flow_steps_beats_rule_namespace() {
use crate::labels::Cap;
// Both signals present: flow_steps wins because it appears first
// in the strategy order.
let evidence = Evidence {
flow_steps: vec![
source_step("src/handler.py", "handle_request"),
sink_step("src/handler.py"),
],
sink_caps: Cap::SHELL_ESCAPE.bits(),
..Default::default()
};
let diag = crate::commands::scan::Diag {
id: "py.cmdi.os_system".into(),
confidence: Some(Confidence::High),
evidence: Some(evidence),
path: "src/handler.py".into(),
..Default::default()
};
let spec = HarnessSpec::from_finding(&diag).unwrap();
assert_eq!(spec.derivation, SpecDerivationStrategy::FromFlowSteps);
}
}

View file

@ -191,6 +191,7 @@ mod tests {
sink_file: "handler.py".into(),
sink_line: 5,
spec_hash: "abcd1234abcd1234".into(),
derivation: crate::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
}
}

View file

@ -11,7 +11,7 @@ use crate::dynamic::sandbox::{toolchain_id_with_digest, SandboxOptions};
use crate::dynamic::spec::{HarnessSpec, SPEC_FORMAT_VERSION};
use crate::dynamic::telemetry::{self, TelemetryEvent};
use crate::dynamic::toolchain;
use crate::evidence::{InconclusiveReason, UnsupportedReason};
use crate::evidence::{InconclusiveReason, SpecDerivationStrategy, UnsupportedReason};
use crate::utils::config::Config;
use std::path::Path;
use std::time::Instant;
@ -152,6 +152,90 @@ fn insert_verdict_cache(
);
}
/// Decide whether a [`HarnessSpec::from_finding_opts`] failure should surface
/// as `Unsupported` (the finding is genuinely unmodellable) or
/// `Inconclusive(SpecDerivationFailed)` (the rule namespace or sink evidence
/// carried enough signal that derivation *should* have worked).
///
/// The rule-of-thumb: if any spec-derivation strategy could plausibly have
/// fired (i.e. the finding had a usable rule namespace, non-empty path, or
/// non-zero sink caps) yet none produced a spec, the failure is
/// **Inconclusive** — we tried and missed. Otherwise it's **Unsupported**.
fn spec_derivation_failed_verdict(
finding_id: String,
diag: &Diag,
reason: UnsupportedReason,
) -> VerifyResult {
if matches!(reason, UnsupportedReason::SpecDerivationFailed) && should_be_inconclusive(diag) {
let strategies: Vec<SpecDerivationStrategy> =
HarnessSpec::derivation_strategies().to_vec();
let hint = derivation_failure_hint(diag);
return VerifyResult {
finding_id,
status: VerifyStatus::Inconclusive,
triggered_payload: None,
reason: None,
inconclusive_reason: Some(InconclusiveReason::SpecDerivationFailed {
tried: strategies,
hint,
}),
detail: None,
attempts: vec![],
toolchain_match: None,
};
}
VerifyResult {
finding_id,
status: VerifyStatus::Unsupported,
triggered_payload: None,
reason: Some(reason),
inconclusive_reason: None,
detail: None,
attempts: vec![],
toolchain_match: None,
}
}
/// True when the finding has *some* derivable signal (rule namespace, sink
/// caps, or evidence) so a spec-derivation failure should be surfaced as
/// `Inconclusive` rather than `Unsupported`.
fn should_be_inconclusive(diag: &Diag) -> bool {
let has_rule_ns = diag.id.split('.').count() >= 2
&& !diag.id.starts_with("taint-")
&& !diag.id.starts_with("cfg-")
&& !diag.id.starts_with("state-");
let has_evidence = diag
.evidence
.as_ref()
.map(|e| e.sink_caps != 0 || !e.flow_steps.is_empty() || e.sink.is_some())
.unwrap_or(false);
has_rule_ns || has_evidence
}
fn derivation_failure_hint(diag: &Diag) -> String {
let ev = match diag.evidence.as_ref() {
Some(e) => e,
None => return "no evidence on finding".to_owned(),
};
let mut parts: Vec<String> = Vec::new();
if !diag.id.is_empty() {
parts.push(format!("rule_id={}", diag.id));
}
if ev.sink_caps == 0 {
parts.push("sink_caps=0".to_owned());
}
if ev.flow_steps.is_empty() {
parts.push("no_flow_steps".to_owned());
}
if diag.path.is_empty() {
parts.push("empty_path".to_owned());
} else {
parts.push(format!("path={}", diag.path));
}
parts.join("; ")
}
/// Try to dynamically confirm a static finding.
///
/// Never fails: every error path collapses into a [`VerifyStatus`] so the
@ -162,16 +246,7 @@ pub fn verify_finding(diag: &Diag, opts: &VerifyOptions) -> VerifyResult {
let spec = match HarnessSpec::from_finding_opts(diag, opts.verify_all_confidence) {
Ok(s) => s,
Err(reason) => {
return VerifyResult {
finding_id,
status: VerifyStatus::Unsupported,
triggered_payload: None,
reason: Some(reason),
inconclusive_reason: None,
detail: None,
attempts: vec![],
toolchain_match: None,
};
return spec_derivation_failed_verdict(finding_id, diag, reason);
}
};
@ -271,7 +346,7 @@ pub fn verify_finding(diag: &Diag, opts: &VerifyOptions) -> VerifyResult {
let event = TelemetryEvent::new(
&spec,
verdict.status,
verdict.inconclusive_reason,
verdict.inconclusive_reason.clone(),
toolchain_match,
elapsed,
build_attempts,

View file

@ -188,9 +188,48 @@ pub enum UnsupportedReason {
LangUnsupported,
}
/// Typed reason for `VerifyStatus::Inconclusive`.
/// Spec-derivation strategy attempted by [`crate::dynamic::spec::HarnessSpec::from_finding_opts`].
///
/// Lives in `evidence.rs` (not `dynamic::spec`) so that
/// [`InconclusiveReason::SpecDerivationFailed`] can carry a `Vec` of attempted
/// strategies without requiring the `dynamic` feature. The canonical
/// accessor is `crate::dynamic::spec::SpecDerivationStrategy` (re-export).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub enum SpecDerivationStrategy {
/// Walk the finding's `evidence.flow_steps`. Original derivation path:
/// the outermost `Source` step with a `function` annotation becomes the
/// entry point. Requires non-empty `flow_steps`.
FromFlowSteps,
/// Inspect the diag's `id` (rule namespace, e.g. `py.cmdi.os_system`,
/// `java.deser.readobject`, `rs.auth.missing_ownership_check.taint`) plus
/// `evidence.sink_caps` to synthesize a single-step flow. Used when the
/// rule namespace alone identifies a sink class.
FromRuleNamespace,
/// Walk a matching [`crate::summary::FuncSummary`] for the sink's
/// enclosing function and construct a synthetic param-to-sink flow per
/// parameter when no real `flow_steps` exist.
FromFuncSummaryWalk,
/// Resolve an entry point through the call graph by treating an entry-kind
/// function (HTTP route, CLI handler) as the spec entry.
FromCallgraphEntry,
}
impl fmt::Display for SpecDerivationStrategy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
Self::FromFlowSteps => "from_flow_steps",
Self::FromRuleNamespace => "from_rule_namespace",
Self::FromFuncSummaryWalk => "from_func_summary_walk",
Self::FromCallgraphEntry => "from_callgraph_entry",
};
f.write_str(s)
}
}
/// Typed reason for `VerifyStatus::Inconclusive`.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub enum InconclusiveReason {
/// The oracle fired but the sink-reachability probe did not — likely an
/// oracle collision where a coincidental output matched the marker pattern.
@ -202,6 +241,17 @@ pub enum InconclusiveReason {
BuildFailed,
/// Sandbox error (spawn failure, I/O error, etc.).
SandboxError,
/// Every [`SpecDerivationStrategy`] candidate was attempted but none
/// produced a runnable [`crate::dynamic::spec::HarnessSpec`]. Distinct
/// from [`UnsupportedReason::SpecDerivationFailed`]: the latter covers
/// genuinely unmodellable findings (e.g. unknown language, zero sink
/// bits), while this variant signals that the rule namespace, sink
/// evidence, or call graph carried enough signal that derivation
/// *should* have worked but did not.
SpecDerivationFailed {
tried: Vec<SpecDerivationStrategy>,
hint: String,
},
}
/// High-level outcome of a dynamic verification attempt.

View file

@ -484,6 +484,7 @@ fn format_dynamic_verdict_annotation(dv: &crate::evidence::VerifyResult) -> Stri
VerifyStatus::Inconclusive => {
let reason = dv
.inconclusive_reason
.as_ref()
.map(format_inconclusive_reason)
.unwrap_or_else(|| {
dv.detail
@ -512,13 +513,20 @@ fn format_unsupported_reason(r: &crate::evidence::UnsupportedReason) -> String {
}
}
fn format_inconclusive_reason(r: crate::evidence::InconclusiveReason) -> String {
fn format_inconclusive_reason(r: &crate::evidence::InconclusiveReason) -> String {
use crate::evidence::InconclusiveReason;
match r {
InconclusiveReason::OracleCollisionSuspected => "oracle collision".to_string(),
InconclusiveReason::NonReproducible => "non-reproducible".to_string(),
InconclusiveReason::BuildFailed => "build failed".to_string(),
InconclusiveReason::SandboxError => "sandbox error".to_string(),
InconclusiveReason::SpecDerivationFailed { hint, .. } => {
if hint.is_empty() {
"spec derivation failed".to_string()
} else {
format!("spec derivation failed ({hint})")
}
}
}
}

View file

@ -0,0 +1,9 @@
# Fixture: spec derived via FromCallgraphEntry (rule id matches `*.http.*`,
# entry point classified as HttpRoute).
from flask import Flask, request
app = Flask(__name__)
@app.route("/echo")
def echo():
return request.args.get("q", "")

View file

@ -0,0 +1,6 @@
# Fixture: spec derived via FromFlowSteps (taint flow with explicit source/sink).
import os
def handle_request(payload):
cmd = payload
os.system(cmd)

View file

@ -0,0 +1,11 @@
// Fixture: spec derived via FromFuncSummaryWalk (FuncSummary records
// `tainted_sink_params` for a param that flows to a sink, without an
// in-evidence flow_steps trace).
fn read_path(_root: &str, name: &str) -> std::io::Result<Vec<u8>> {
std::fs::read(name)
}
fn main() {
let _ = read_path("/", "/etc/passwd");
}

View file

@ -0,0 +1,6 @@
# Fixture: spec derived via FromRuleNamespace (AST pattern `py.cmdi.os_system`
# without a taint flow).
import os
def run_user_command(user_arg):
os.system(user_arg)

View file

@ -33,6 +33,7 @@ mod repro_determinism_tests {
sink_file: "app.py".into(),
sink_line: 10,
spec_hash: spec_hash.to_owned(),
derivation: nyx_scanner::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
}
}
@ -163,6 +164,7 @@ mod repro_determinism_tests {
sink_file: "src/entry.rs".into(),
sink_line: 18,
spec_hash: spec_hash.to_owned(),
derivation: nyx_scanner::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
}
}
@ -293,6 +295,7 @@ fn main() {
sink_file: "tests/dynamic_fixtures/js/sqli_positive.js".into(),
sink_line: 8,
spec_hash: spec_hash.to_owned(),
derivation: nyx_scanner::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
}
}
@ -346,6 +349,7 @@ fn main() {
sink_file: "tests/dynamic_fixtures/go/sqli_positive.go".into(),
sink_line: 12,
spec_hash: spec_hash.to_owned(),
derivation: nyx_scanner::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
}
}
@ -399,6 +403,7 @@ fn main() {
sink_file: "tests/dynamic_fixtures/java/sqli_positive.java".into(),
sink_line: 9,
spec_hash: spec_hash.to_owned(),
derivation: nyx_scanner::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
}
}
@ -452,6 +457,7 @@ fn main() {
sink_file: "tests/dynamic_fixtures/php/sqli_positive.php".into(),
sink_line: 9,
spec_hash: spec_hash.to_owned(),
derivation: nyx_scanner::dynamic::spec::SpecDerivationStrategy::FromFlowSteps,
}
}

View file

@ -0,0 +1,281 @@
//! Phase 01, Track A.1: integration coverage for
//! `HarnessSpec::from_finding_opts` strategy fall-through.
//!
//! Exercises each `SpecDerivationStrategy` end-to-end:
//!
//! 1. [`FromFlowSteps`] — explicit flow_steps in evidence.
//! 2. [`FromRuleNamespace`] — rule id namespace + sink_caps.
//! 3. [`FromFuncSummaryWalk`] — walking `FuncSummary::tainted_sink_params`.
//! 4. [`FromCallgraphEntry`] — `*.http.*` rule id → HttpRoute entry.
//!
//! Also asserts that
//! [`crate::evidence::InconclusiveReason::SpecDerivationFailed`] is surfaced
//! when no strategy succeeds but the finding had derivable signal.
//!
//! Gated on `--features dynamic`; the strategy types live in
//! `dynamic::spec` but the `InconclusiveReason` payload is always-present.
#[cfg(feature = "dynamic")]
mod spec_strategies {
use nyx_scanner::commands::scan::Diag;
use nyx_scanner::dynamic::spec::{
derive_from_callgraph_entry, derive_from_func_summary, derive_from_rule_namespace,
EntryKind, HarnessSpec, PayloadSlot, SpecDerivationStrategy,
};
use nyx_scanner::dynamic::verify::{verify_finding, VerifyOptions};
use nyx_scanner::evidence::{
Confidence, Evidence, FlowStep, FlowStepKind, InconclusiveReason, UnsupportedReason,
VerifyStatus,
};
use nyx_scanner::labels::Cap;
use nyx_scanner::patterns::{FindingCategory, Severity};
use nyx_scanner::summary::FuncSummary;
fn make_diag(id: &str, path: &str, line: usize) -> Diag {
Diag {
path: path.into(),
line,
col: 0,
severity: Severity::High,
id: id.into(),
category: FindingCategory::Security,
path_validated: false,
guard_kind: None,
message: None,
labels: vec![],
confidence: Some(Confidence::High),
evidence: Some(Evidence::default()),
rank_score: None,
rank_reason: None,
suppressed: false,
suppression: None,
rollup: None,
finding_id: String::new(),
alternative_finding_ids: vec![],
stable_hash: 0,
}
}
fn source_step(file: &str, function: &str) -> FlowStep {
FlowStep {
step: 1,
kind: FlowStepKind::Source,
file: file.into(),
line: 4,
col: 0,
snippet: None,
variable: Some("payload".into()),
callee: None,
function: Some(function.into()),
is_cross_file: false,
}
}
fn sink_step(file: &str) -> FlowStep {
FlowStep {
step: 2,
kind: FlowStepKind::Sink,
file: file.into(),
line: 6,
col: 0,
snippet: Some("os.system".into()),
variable: None,
callee: Some("os.system".into()),
function: None,
is_cross_file: false,
}
}
// ── Strategy 1: FromFlowSteps ────────────────────────────────────────────
#[test]
fn from_flow_steps_strategy_drives_taint_finding() {
let mut diag = make_diag(
"taint-unsanitised-flow (source 4:0)",
"tests/dynamic_fixtures/spec_strategies/flow_steps_taint.py",
6,
);
let mut ev = Evidence::default();
ev.flow_steps = vec![
source_step("tests/dynamic_fixtures/spec_strategies/flow_steps_taint.py", "handle_request"),
sink_step("tests/dynamic_fixtures/spec_strategies/flow_steps_taint.py"),
];
ev.sink_caps = Cap::SHELL_ESCAPE.bits();
diag.evidence = Some(ev);
let spec = HarnessSpec::from_finding(&diag).expect("flow_steps strategy must succeed");
assert_eq!(spec.derivation, SpecDerivationStrategy::FromFlowSteps);
assert_eq!(spec.entry_name, "handle_request");
assert_eq!(spec.expected_cap, Cap::SHELL_ESCAPE);
}
// ── Strategy 2: FromRuleNamespace ────────────────────────────────────────
#[test]
fn from_rule_namespace_strategy_drives_ast_finding() {
let mut diag = make_diag(
"py.cmdi.os_system",
"tests/dynamic_fixtures/spec_strategies/rule_namespace_cmdi.py",
6,
);
// Empty flow_steps, but sink_caps set on evidence.
let mut ev = Evidence::default();
ev.sink_caps = Cap::SHELL_ESCAPE.bits();
diag.evidence = Some(ev);
let spec = HarnessSpec::from_finding(&diag).expect("rule-namespace strategy must succeed");
assert_eq!(spec.derivation, SpecDerivationStrategy::FromRuleNamespace);
assert_eq!(spec.expected_cap, Cap::SHELL_ESCAPE);
assert_eq!(spec.toolchain_id, "python-3");
}
#[test]
fn from_rule_namespace_called_directly_returns_some() {
let mut diag = make_diag(
"java.deser.readobject",
"src/Main.java",
12,
);
let mut ev = Evidence::default();
ev.sink_caps = Cap::DESERIALIZE.bits();
diag.evidence = Some(ev.clone());
let spec = derive_from_rule_namespace(&diag, &ev).expect("must succeed");
assert_eq!(spec.derivation, SpecDerivationStrategy::FromRuleNamespace);
assert_eq!(spec.expected_cap, Cap::DESERIALIZE);
}
// ── Strategy 3: FromFuncSummaryWalk ──────────────────────────────────────
#[test]
fn from_func_summary_strategy_picks_first_tainted_param() {
let mut diag = make_diag(
"cfg-unguarded-sink",
"tests/dynamic_fixtures/spec_strategies/func_summary_walk.rs",
5,
);
diag.evidence = Some(Evidence::default());
let summary = FuncSummary {
name: "read_path".into(),
file_path: "tests/dynamic_fixtures/spec_strategies/func_summary_walk.rs".into(),
lang: "rust".into(),
param_count: 2,
param_names: vec!["root".into(), "name".into()],
source_caps: 0,
sanitizer_caps: 0,
sink_caps: Cap::FILE_IO.bits(),
propagating_params: vec![],
propagates_taint: false,
tainted_sink_params: vec![1],
param_to_sink: vec![],
callees: vec![],
container: String::new(),
disambig: None,
kind: Default::default(),
module_path: None,
rust_use_map: None,
rust_wildcards: None,
hierarchy_edges: vec![],
entry_kind: None,
};
let spec =
derive_from_func_summary(&diag, diag.evidence.as_ref().unwrap(), Some(&summary))
.expect("summary strategy must succeed");
assert_eq!(spec.derivation, SpecDerivationStrategy::FromFuncSummaryWalk);
assert!(matches!(spec.payload_slot, PayloadSlot::Param(1)));
assert_eq!(spec.entry_name, "read_path");
}
// ── Strategy 4: FromCallgraphEntry ───────────────────────────────────────
#[test]
fn from_callgraph_entry_strategy_marks_http_route() {
let mut diag = make_diag(
"py.http.flask_route",
"tests/dynamic_fixtures/spec_strategies/callgraph_entry_http.py",
8,
);
let mut ev = Evidence::default();
ev.sink_caps = Cap::SSRF.bits();
diag.evidence = Some(ev);
let spec = HarnessSpec::from_finding(&diag).expect("callgraph-entry strategy must succeed");
assert_eq!(spec.derivation, SpecDerivationStrategy::FromCallgraphEntry);
assert!(matches!(spec.entry_kind, EntryKind::HttpRoute));
}
#[test]
fn from_callgraph_entry_called_directly_returns_some() {
let mut diag = make_diag(
"rs.cli.subcommand_parse",
"src/main.rs",
10,
);
let mut ev = Evidence::default();
ev.sink_caps = Cap::SHELL_ESCAPE.bits();
diag.evidence = Some(ev.clone());
let spec = derive_from_callgraph_entry(&diag, &ev).expect("must succeed");
assert_eq!(spec.derivation, SpecDerivationStrategy::FromCallgraphEntry);
assert!(matches!(spec.entry_kind, EntryKind::CliSubcommand));
}
// ── Failure path: Inconclusive(SpecDerivationFailed) ─────────────────────
#[test]
fn verify_finding_surfaces_inconclusive_when_strategies_exhaust_signal() {
// Rule namespace identifies a known sink class (`cmdi`), but the path
// language disagrees with the rule's language and there are no
// flow_steps to fall back on. Every strategy bails — but the finding
// had usable signal, so the verifier reports Inconclusive.
let mut diag = make_diag("py.cmdi.os_system", "src/Main.java", 5);
let mut ev = Evidence::default();
ev.sink_caps = Cap::SHELL_ESCAPE.bits();
diag.evidence = Some(ev);
let result = verify_finding(&diag, &VerifyOptions::default());
assert_eq!(result.status, VerifyStatus::Inconclusive);
match result.inconclusive_reason {
Some(InconclusiveReason::SpecDerivationFailed { tried, hint }) => {
assert_eq!(tried.len(), 4);
assert!(!hint.is_empty(), "hint must summarise the failed inputs");
}
other => panic!("expected SpecDerivationFailed, got {other:?}"),
}
}
#[test]
fn verify_finding_surfaces_unsupported_when_no_signal_at_all() {
// No evidence struct, no rule namespace, no path. Genuinely
// unmodellable → Unsupported(NoFlowSteps).
let diag = make_diag("", "", 0);
let diag = Diag {
evidence: None,
..diag
};
let result = verify_finding(&diag, &VerifyOptions::default());
assert_eq!(result.status, VerifyStatus::Unsupported);
assert_eq!(result.reason, Some(UnsupportedReason::NoFlowSteps));
}
// ── Strategy ordering ────────────────────────────────────────────────────
#[test]
fn strategy_priority_flow_steps_wins_over_rule_namespace() {
// Both signals present: flow_steps wins because it's first in
// `HarnessSpec::derivation_strategies()`.
let mut diag = make_diag(
"py.cmdi.os_system",
"tests/dynamic_fixtures/spec_strategies/flow_steps_taint.py",
6,
);
let mut ev = Evidence::default();
ev.flow_steps = vec![
source_step("tests/dynamic_fixtures/spec_strategies/flow_steps_taint.py", "handle_request"),
sink_step("tests/dynamic_fixtures/spec_strategies/flow_steps_taint.py"),
];
ev.sink_caps = Cap::SHELL_ESCAPE.bits();
diag.evidence = Some(ev);
let spec = HarnessSpec::from_finding(&diag).unwrap();
assert_eq!(spec.derivation, SpecDerivationStrategy::FromFlowSteps);
}
}