diff --git a/src/dynamic/framework/adapters/mod.rs b/src/dynamic/framework/adapters/mod.rs index 4fee76c7..674952a2 100644 --- a/src/dynamic/framework/adapters/mod.rs +++ b/src/dynamic/framework/adapters/mod.rs @@ -29,8 +29,13 @@ pub mod php_unserialize; pub mod pp_json_deep_assign; pub mod pp_lodash_merge; pub mod pp_object_assign; +pub mod python_django; +pub mod python_fastapi; +pub mod python_flask; pub mod python_jinja2; pub mod python_pickle; +pub mod python_routes; +pub mod python_starlette; pub mod redirect_go; pub mod redirect_java; pub mod redirect_js; @@ -68,8 +73,12 @@ pub use php_unserialize::PhpUnserializeAdapter; pub use pp_json_deep_assign::{PpJsonDeepAssignJsAdapter, PpJsonDeepAssignTsAdapter}; pub use pp_lodash_merge::{PpLodashMergeJsAdapter, PpLodashMergeTsAdapter}; pub use pp_object_assign::{PpObjectAssignJsAdapter, PpObjectAssignTsAdapter}; +pub use python_django::PythonDjangoAdapter; +pub use python_fastapi::PythonFastApiAdapter; +pub use python_flask::PythonFlaskAdapter; pub use python_jinja2::PythonJinja2Adapter; pub use python_pickle::PythonPickleAdapter; +pub use python_starlette::PythonStarletteAdapter; pub use redirect_go::RedirectGoAdapter; pub use redirect_java::RedirectJavaAdapter; pub use redirect_js::RedirectJsAdapter; diff --git a/src/dynamic/framework/adapters/python_django.rs b/src/dynamic/framework/adapters/python_django.rs new file mode 100644 index 00000000..2cbdd216 --- /dev/null +++ b/src/dynamic/framework/adapters/python_django.rs @@ -0,0 +1,335 @@ +//! Python Django [`super::super::FrameworkAdapter`] (Phase 12 — Track L.10). +//! +//! Two recognition shapes: +//! +//! - `urls.py` registrations: `path("…", view)`, `re_path(r"…", view)`, +//! `url(r"…", view)`. Adapter matches the second argument's last +//! identifier segment (so `views.list_users`, `MyView.as_view()`, +//! and bare `list_users` all hit the same predicate) against +//! `summary.name`. +//! - Class-based views: a method named `get` / `post` / `put` / +//! `patch` / `delete` / `head` / `options` on a class extending +//! `View` / `APIView` / `ViewSet` / `TemplateView`. The route +//! path is left as `"/"` when no matching `urls.py` entry can be +//! found in the same file — the runner is still able to drive +//! the view through `RequestFactory`, which does not require a +//! real URL conf. + +use crate::dynamic::framework::{FrameworkAdapter, FrameworkBinding, HttpMethod, RouteShape}; +use crate::evidence::EntryKind; +use crate::summary::FuncSummary; +use crate::symbol::Lang; +use tree_sitter::Node; + +use super::python_routes::{ + bind_path_params, find_python_function, function_formal_names, source_imports_django, +}; + +pub struct PythonDjangoAdapter; + +const ADAPTER_NAME: &str = "python-django"; + +fn http_method_from_method_name(name: &str) -> Option { + HttpMethod::from_ident(name) +} + +fn class_super_looks_like_view(text: &str) -> bool { + text.contains("View") + || text.contains("APIView") + || text.contains("ViewSet") + || text.contains("TemplateView") + || text.contains("ListView") + || text.contains("DetailView") + || text.contains("CreateView") + || text.contains("UpdateView") + || text.contains("DeleteView") +} + +fn enclosing_class<'a>(node: Node<'a>) -> Option> { + let mut cur = node.parent(); + while let Some(p) = cur { + if p.kind() == "class_definition" { + return Some(p); + } + cur = p.parent(); + } + None +} + +/// Walk `urls.py`-style registrations (`path(...)`, `re_path(...)`, +/// `url(...)`) and return `Some(path_template)` when one of them +/// references `target` as the second positional argument. When +/// `class_target` is `Some`, an `as_view`-based registration whose +/// receiver class matches is also accepted (so `path("users/", +/// UserView.as_view())` binds the class's method-as-view). +fn url_template_for( + root: Node<'_>, + bytes: &[u8], + target: &str, + class_target: Option<&str>, +) -> Option { + let mut hit: Option = None; + walk_url_registrations(root, bytes, target, class_target, &mut hit); + hit +} + +fn walk_url_registrations( + node: Node<'_>, + bytes: &[u8], + target: &str, + class_target: Option<&str>, + out: &mut Option, +) { + if out.is_some() { + return; + } + if node.kind() == "call" + && let Some(callee) = node + .child_by_field_name("function") + .and_then(|n| n.utf8_text(bytes).ok()) + { + let last = callee.rsplit_once('.').map(|(_, s)| s).unwrap_or(callee); + if matches!(last, "path" | "re_path" | "url") { + if let Some(args) = node.child_by_field_name("arguments") { + let positional = positional_args(args); + if positional.len() >= 2 { + let view_arg = positional[1]; + if view_arg_references(view_arg, bytes, target, class_target) { + if let Some(template) = first_string_arg(args, bytes) { + *out = Some(template); + return; + } + } + } + } + } + } + let mut cur = node.walk(); + for child in node.children(&mut cur) { + walk_url_registrations(child, bytes, target, class_target, out); + } +} + +fn positional_args(args: Node<'_>) -> Vec> { + let mut out = Vec::new(); + let mut cur = args.walk(); + for c in args.named_children(&mut cur) { + if c.kind() != "keyword_argument" { + out.push(c); + } + } + out +} + +fn first_string_arg(args: Node<'_>, bytes: &[u8]) -> Option { + let mut cur = args.walk(); + for c in args.named_children(&mut cur) { + if c.kind() == "string" { + let raw = c.utf8_text(bytes).ok()?; + return Some(strip_quotes(raw).to_owned()); + } + } + None +} + +fn strip_quotes(raw: &str) -> &str { + let t = raw.trim(); + let t = t.strip_prefix("b").unwrap_or(t); + let t = t.strip_prefix("r").unwrap_or(t); + let t = t.strip_prefix("u").unwrap_or(t); + t.trim_matches(['\'', '"']) +} + +fn view_arg_references( + node: Node<'_>, + bytes: &[u8], + target: &str, + class_target: Option<&str>, +) -> bool { + let Ok(text) = node.utf8_text(bytes) else { + return false; + }; + let trimmed = text.trim(); + // `MyView.as_view()` (with or without args) → strip trailing `()` + // and `.as_view` so the residual is the class name. + if let Some(class) = trimmed + .strip_suffix(')') + .and_then(|s| s.rfind('(').map(|i| &s[..i])) + .and_then(|s| s.strip_suffix(".as_view")) + { + if let Some(ct) = class_target + && class.rsplit_once('.').map(|(_, s)| s).unwrap_or(class) == ct + { + return true; + } + } + let stripped = trimmed.trim_end_matches("()"); + let last = stripped.rsplit_once('.').map(|(_, s)| s).unwrap_or(stripped); + last == target || stripped == target +} + +impl FrameworkAdapter for PythonDjangoAdapter { + fn name(&self) -> &'static str { + ADAPTER_NAME + } + + fn lang(&self) -> Lang { + Lang::Python + } + + fn detect( + &self, + summary: &FuncSummary, + ast: Node<'_>, + file_bytes: &[u8], + ) -> Option { + if !source_imports_django(file_bytes) { + return None; + } + let (func_node, _) = find_python_function(ast, file_bytes, &summary.name)?; + + // Class-based view: method named after an HTTP verb inside a + // View-derived class. + let enclosing = enclosing_class(func_node); + let cbv_class_name = enclosing + .and_then(|c| c.child_by_field_name("name")) + .and_then(|n| n.utf8_text(file_bytes).ok()) + .map(str::to_owned); + let cbv_method = http_method_from_method_name(&summary.name).filter(|_| { + enclosing + .and_then(|c| c.child_by_field_name("superclasses")) + .map(|supers| { + let mut cur = supers.walk(); + supers.named_children(&mut cur).any(|sup| { + sup.utf8_text(file_bytes) + .map(class_super_looks_like_view) + .unwrap_or(false) + }) + }) + .unwrap_or(false) + }); + + // Pick (method, path) from one of: + // - urls.py registration referencing the function + // - urls.py `ClassName.as_view()` registration referencing the enclosing class + // - class-based view method name (path falls back to `/`) + // - function-based view with `def name(request, ...):` signature + let url_template = url_template_for( + ast, + file_bytes, + &summary.name, + cbv_class_name.as_deref(), + ); + + let (method, path) = if let Some(m) = cbv_method { + (m, url_template.unwrap_or_else(|| "/".to_owned())) + } else if url_template.is_some() { + (HttpMethod::GET, url_template.unwrap()) + } else { + // Last-resort: treat any function whose first formal is + // `request` as a function-based view. This catches the + // common Django pattern in files without an inlined + // urls.py snippet. + let formals = function_formal_names(func_node, file_bytes); + if formals.first().map(String::as_str) != Some("request") { + return None; + } + (HttpMethod::GET, "/".to_owned()) + }; + + let formals = function_formal_names(func_node, file_bytes); + let request_params = bind_path_params(&formals, &path); + + Some(FrameworkBinding { + adapter: ADAPTER_NAME.to_owned(), + kind: EntryKind::HttpRoute, + route: Some(RouteShape { method, path }), + request_params, + response_writer: None, + middleware: Vec::new(), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::dynamic::framework::ParamSource; + + fn parse(src: &[u8]) -> tree_sitter::Tree { + let mut parser = tree_sitter::Parser::new(); + let lang = tree_sitter::Language::from(tree_sitter_python::LANGUAGE); + parser.set_language(&lang).unwrap(); + parser.parse(src, None).unwrap() + } + + fn summary(name: &str) -> FuncSummary { + FuncSummary { + name: name.into(), + lang: "python".into(), + ..Default::default() + } + } + + #[test] + fn fires_on_function_view_with_path_registration() { + let src: &[u8] = b"from django.http import HttpResponse\nfrom django.urls import path\ndef list_users(request):\n return HttpResponse(\"ok\")\nurlpatterns = [path(\"users/\", list_users)]\n"; + let tree = parse(src); + let binding = PythonDjangoAdapter + .detect(&summary("list_users"), tree.root_node(), src) + .unwrap(); + assert_eq!(binding.route.as_ref().unwrap().path, "users/"); + assert_eq!(binding.route.as_ref().unwrap().method, HttpMethod::GET); + let request_arg = binding + .request_params + .iter() + .find(|p| p.name == "request") + .unwrap(); + assert!(matches!(request_arg.source, ParamSource::Implicit)); + } + + #[test] + fn fires_on_class_based_view_get_method() { + let src: &[u8] = b"from django.views import View\nfrom django.http import HttpResponse\nclass UserView(View):\n def get(self, request, id):\n return HttpResponse(id)\n"; + let tree = parse(src); + let binding = PythonDjangoAdapter + .detect(&summary("get"), tree.root_node(), src) + .unwrap(); + assert_eq!(binding.route.as_ref().unwrap().method, HttpMethod::GET); + } + + #[test] + fn fires_on_as_view_registration() { + let src: &[u8] = b"from django.views import View\nfrom django.urls import path\nclass UserView(View):\n def get(self, request, id):\n return None\nurlpatterns = [path(\"users//\", UserView.as_view())]\n"; + let tree = parse(src); + let binding = PythonDjangoAdapter + .detect(&summary("get"), tree.root_node(), src) + .unwrap(); + let route = binding.route.unwrap(); + assert_eq!(route.path, "users//"); + let id_binding = binding + .request_params + .iter() + .find(|p| p.name == "id") + .unwrap(); + assert!(matches!(id_binding.source, ParamSource::PathSegment(_))); + } + + #[test] + fn skips_when_django_not_imported() { + let src: &[u8] = b"def list_users(request):\n return None\n"; + let tree = parse(src); + assert!(PythonDjangoAdapter + .detect(&summary("list_users"), tree.root_node(), src) + .is_none()); + } + + #[test] + fn skips_plain_helper_function() { + let src: &[u8] = b"from django.http import HttpResponse\ndef helper(x):\n return HttpResponse(x)\n"; + let tree = parse(src); + assert!(PythonDjangoAdapter + .detect(&summary("helper"), tree.root_node(), src) + .is_none()); + } +} diff --git a/src/dynamic/framework/adapters/python_fastapi.rs b/src/dynamic/framework/adapters/python_fastapi.rs new file mode 100644 index 00000000..a76e186c --- /dev/null +++ b/src/dynamic/framework/adapters/python_fastapi.rs @@ -0,0 +1,344 @@ +//! Python FastAPI [`super::super::FrameworkAdapter`] (Phase 12 — Track L.10). +//! +//! Recognises `@app.get("/path")`, `@app.post(...)`, `@router.put(...)`, +//! `@router.patch(...)`, `@router.delete(...)`, `@app.options(...)`, +//! `@app.head(...)`, `@app.websocket(...)`, and the `Depends(...)` / +//! Pydantic `BaseModel` formals that come with them. Decorator +//! detection walks the AST so the adapter sees the literal path +//! template; the per-formal [`super::super::ParamBinding`] list +//! classifies request-body-typed formals as +//! [`super::super::ParamSource::JsonBody`] when the annotation refers +//! to a class declared earlier in the same file (a strong Pydantic +//! signal) and falls back to `QueryParam(name)` otherwise. + +use crate::dynamic::framework::{ + FrameworkAdapter, FrameworkBinding, HttpMethod, ParamBinding, ParamSource, RouteShape, +}; +use crate::evidence::EntryKind; +use crate::summary::FuncSummary; +use crate::symbol::Lang; +use tree_sitter::Node; + +use super::python_routes::{ + bind_path_params, find_python_function, function_formal_names, source_imports_fastapi, +}; + +pub struct PythonFastApiAdapter; + +const ADAPTER_NAME: &str = "python-fastapi"; + +fn shortcut_method(attr: &str) -> Option { + match attr.to_ascii_lowercase().as_str() { + "get" => Some(HttpMethod::GET), + "head" => Some(HttpMethod::HEAD), + "post" => Some(HttpMethod::POST), + "put" => Some(HttpMethod::PUT), + "patch" => Some(HttpMethod::PATCH), + "delete" => Some(HttpMethod::DELETE), + "options" => Some(HttpMethod::OPTIONS), + "websocket" | "websocket_route" => Some(HttpMethod::GET), + _ => None, + } +} + +fn receiver_looks_like_fastapi(name: &str) -> bool { + let lower = name.to_ascii_lowercase(); + matches!( + lower.as_str(), + "app" | "application" | "router" | "api_router" + ) || lower.ends_with("_router") + || lower.ends_with("_app") +} + +fn decorator_route_shape(decorator: Node<'_>, bytes: &[u8]) -> Option<(HttpMethod, String)> { + let mut cur = decorator.walk(); + let expr = decorator.children(&mut cur).find(|c| c.kind() != "@")?; + if expr.kind() != "call" { + return None; + } + let target = expr.child_by_field_name("function")?; + let args = expr.child_by_field_name("arguments")?; + if target.kind() != "attribute" { + return None; + } + let object = target.child_by_field_name("object")?.utf8_text(bytes).ok()?; + let attr = target.child_by_field_name("attribute")?.utf8_text(bytes).ok()?; + if !receiver_looks_like_fastapi(object) { + return None; + } + let method = shortcut_method(attr)?; + let path = first_string_arg(args, bytes)?; + Some((method, path)) +} + +fn first_string_arg(args: Node<'_>, bytes: &[u8]) -> Option { + let mut cur = args.walk(); + for c in args.named_children(&mut cur) { + if c.kind() == "string" { + let raw = c.utf8_text(bytes).ok()?; + return Some(strip_quotes(raw).to_owned()); + } + } + None +} + +fn strip_quotes(raw: &str) -> &str { + let t = raw.trim(); + let t = t.strip_prefix("b").unwrap_or(t); + let t = t.strip_prefix("r").unwrap_or(t); + let t = t.strip_prefix("u").unwrap_or(t); + t.trim_matches(['\'', '"']) +} + +/// Refine per-formal bindings by inspecting the parameter list for +/// Pydantic body models and `Depends(...)` declarations. An +/// annotation pointing at a class declared in the same file is +/// treated as a `JsonBody`; an `= Depends(...)` default is treated +/// as `Implicit` (dependency-injected — not adversary-controlled +/// directly). +fn refine_for_fastapi( + func: Node<'_>, + bytes: &[u8], + file_classes: &[String], + base: Vec, +) -> Vec { + let Some(params) = func.child_by_field_name("parameters") else { + return base; + }; + let mut by_name: std::collections::HashMap = + std::collections::HashMap::new(); + let mut cur = params.walk(); + for child in params.named_children(&mut cur) { + if let Some((name, refinement)) = classify_formal(child, bytes, file_classes) { + by_name.insert(name, refinement); + } + } + base.into_iter() + .map(|b| match by_name.get(&b.name) { + Some(ParamRefinement::JsonBody) => ParamBinding { + source: ParamSource::JsonBody, + ..b + }, + Some(ParamRefinement::Implicit) => ParamBinding { + source: ParamSource::Implicit, + ..b + }, + _ => b, + }) + .collect() +} + +enum ParamRefinement { + JsonBody, + Implicit, +} + +fn classify_formal( + node: Node<'_>, + bytes: &[u8], + file_classes: &[String], +) -> Option<(String, ParamRefinement)> { + match node.kind() { + "typed_default_parameter" | "default_parameter" => { + let value = node.child_by_field_name("value")?; + let name = first_identifier(node, bytes)?; + if call_callee_text(value, bytes) + .map(|t| t.contains("Depends")) + .unwrap_or(false) + { + return Some((name, ParamRefinement::Implicit)); + } + if let Some(t) = node.child_by_field_name("type") + && let Some(ann) = t.utf8_text(bytes).ok() + && file_classes.iter().any(|c| ann.contains(c)) + { + return Some((name, ParamRefinement::JsonBody)); + } + None + } + "typed_parameter" => { + let name = first_identifier(node, bytes)?; + let t = node.child_by_field_name("type")?.utf8_text(bytes).ok()?; + if file_classes.iter().any(|c| t.contains(c)) { + return Some((name, ParamRefinement::JsonBody)); + } + None + } + _ => None, + } +} + +fn first_identifier(node: Node<'_>, bytes: &[u8]) -> Option { + let mut cur = node.walk(); + for c in node.named_children(&mut cur) { + if c.kind() == "identifier" { + return c.utf8_text(bytes).ok().map(str::to_owned); + } + } + None +} + +fn call_callee_text(node: Node<'_>, bytes: &[u8]) -> Option { + if node.kind() != "call" { + return None; + } + node.child_by_field_name("function")? + .utf8_text(bytes) + .ok() + .map(str::to_owned) +} + +/// Enumerate top-level class names so [`refine_for_fastapi`] can spot +/// Pydantic body models. Conservative: walks the file once and +/// records every `class_definition`'s name. +fn collect_class_names(root: Node<'_>, bytes: &[u8]) -> Vec { + let mut out = Vec::new(); + walk_classes(root, bytes, &mut out); + out +} + +fn walk_classes(node: Node<'_>, bytes: &[u8], out: &mut Vec) { + if node.kind() == "class_definition" + && let Some(name) = node + .child_by_field_name("name") + .and_then(|n| n.utf8_text(bytes).ok()) + { + out.push(name.to_owned()); + } + let mut cur = node.walk(); + for child in node.children(&mut cur) { + walk_classes(child, bytes, out); + } +} + +impl FrameworkAdapter for PythonFastApiAdapter { + fn name(&self) -> &'static str { + ADAPTER_NAME + } + + fn lang(&self) -> Lang { + Lang::Python + } + + fn detect( + &self, + summary: &FuncSummary, + ast: Node<'_>, + file_bytes: &[u8], + ) -> Option { + if !source_imports_fastapi(file_bytes) { + return None; + } + let (func_node, decorated_node) = find_python_function(ast, file_bytes, &summary.name)?; + let decorated = decorated_node?; + let classes = collect_class_names(ast, file_bytes); + let mut cur = decorated.walk(); + for d in decorated.children(&mut cur) { + if d.kind() != "decorator" { + continue; + } + if let Some((method, path)) = decorator_route_shape(d, file_bytes) { + let formals = function_formal_names(func_node, file_bytes); + let base = bind_path_params(&formals, &path); + let request_params = refine_for_fastapi(func_node, file_bytes, &classes, base); + return Some(FrameworkBinding { + adapter: ADAPTER_NAME.to_owned(), + kind: EntryKind::HttpRoute, + route: Some(RouteShape { method, path }), + request_params, + response_writer: None, + middleware: Vec::new(), + }); + } + } + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse(src: &[u8]) -> tree_sitter::Tree { + let mut parser = tree_sitter::Parser::new(); + let lang = tree_sitter::Language::from(tree_sitter_python::LANGUAGE); + parser.set_language(&lang).unwrap(); + parser.parse(src, None).unwrap() + } + + fn summary(name: &str) -> FuncSummary { + FuncSummary { + name: name.into(), + lang: "python".into(), + ..Default::default() + } + } + + #[test] + fn fires_on_app_get() { + let src: &[u8] = b"from fastapi import FastAPI\napp = FastAPI()\n@app.get(\"/items/{id}\")\ndef read_item(id):\n return id\n"; + let tree = parse(src); + let binding = PythonFastApiAdapter + .detect(&summary("read_item"), tree.root_node(), src) + .unwrap(); + let route = binding.route.unwrap(); + assert_eq!(route.method, HttpMethod::GET); + assert_eq!(route.path, "/items/{id}"); + let id_binding = binding + .request_params + .iter() + .find(|p| p.name == "id") + .unwrap(); + assert!(matches!(id_binding.source, ParamSource::PathSegment(_))); + } + + #[test] + fn fires_on_router_post() { + let src: &[u8] = + b"from fastapi import APIRouter\nrouter = APIRouter()\n@router.post(\"/items\")\ndef create_item(payload):\n return payload\n"; + let tree = parse(src); + let binding = PythonFastApiAdapter + .detect(&summary("create_item"), tree.root_node(), src) + .unwrap(); + assert_eq!(binding.route.unwrap().method, HttpMethod::POST); + } + + #[test] + fn pydantic_body_becomes_json_body() { + let src: &[u8] = b"from fastapi import FastAPI\nfrom pydantic import BaseModel\nclass Item(BaseModel):\n name: str\napp = FastAPI()\n@app.post(\"/items\")\ndef create_item(item: Item):\n return item\n"; + let tree = parse(src); + let binding = PythonFastApiAdapter + .detect(&summary("create_item"), tree.root_node(), src) + .unwrap(); + let item_binding = binding + .request_params + .iter() + .find(|p| p.name == "item") + .unwrap(); + assert!(matches!(item_binding.source, ParamSource::JsonBody)); + } + + #[test] + fn depends_default_becomes_implicit() { + let src: &[u8] = b"from fastapi import FastAPI, Depends\napp = FastAPI()\ndef get_db():\n return None\n@app.get(\"/items\")\ndef list_items(db = Depends(get_db)):\n return db\n"; + let tree = parse(src); + let binding = PythonFastApiAdapter + .detect(&summary("list_items"), tree.root_node(), src) + .unwrap(); + let db_binding = binding + .request_params + .iter() + .find(|p| p.name == "db") + .unwrap(); + assert!(matches!(db_binding.source, ParamSource::Implicit)); + } + + #[test] + fn skips_when_fastapi_not_imported() { + let src: &[u8] = b"from flask import Flask\napp = Flask(__name__)\n@app.get(\"/x\")\ndef x():\n return 1\n"; + let tree = parse(src); + assert!(PythonFastApiAdapter + .detect(&summary("x"), tree.root_node(), src) + .is_none()); + } +} diff --git a/src/dynamic/framework/adapters/python_flask.rs b/src/dynamic/framework/adapters/python_flask.rs new file mode 100644 index 00000000..031a0657 --- /dev/null +++ b/src/dynamic/framework/adapters/python_flask.rs @@ -0,0 +1,291 @@ +//! Python Flask [`super::super::FrameworkAdapter`] (Phase 12 — Track L.10). +//! +//! Recognises `@app.route("/path", methods=[…])` plus the verb-shortcut +//! decorators `@app.get`, `@app.post`, `@app.put`, `@app.patch`, +//! `@app.delete` on either an application object or a +//! `flask.Blueprint` (typical aliases: `app`, `application`, `bp`, +//! `blueprint`, `router`). Decorator detection walks the AST so the +//! adapter sees the literal path template + the `methods=` kwarg — +//! both of which feed [`super::super::RouteShape`] and the per-formal +//! [`super::super::ParamBinding`] list that downstream harness emitters +//! use to construct a real HTTP request. + +use crate::dynamic::framework::{FrameworkAdapter, FrameworkBinding, HttpMethod, RouteShape}; +use crate::evidence::EntryKind; +use crate::summary::FuncSummary; +use crate::symbol::Lang; +use tree_sitter::Node; + +use super::python_routes::{ + bind_path_params, find_python_function, function_formal_names, source_imports_flask, +}; + +pub struct PythonFlaskAdapter; + +const ADAPTER_NAME: &str = "python-flask"; + +/// Verb shortcuts (`@app.get` / `@app.post` / …). Excludes +/// `route` — that decorator carries the verb in a `methods=` kwarg +/// instead of in the attribute name and is handled separately. +fn shortcut_method(attr: &str) -> Option { + match attr.to_ascii_lowercase().as_str() { + "get" => Some(HttpMethod::GET), + "head" => Some(HttpMethod::HEAD), + "post" => Some(HttpMethod::POST), + "put" => Some(HttpMethod::PUT), + "patch" => Some(HttpMethod::PATCH), + "delete" => Some(HttpMethod::DELETE), + "options" => Some(HttpMethod::OPTIONS), + _ => None, + } +} + +/// Receiver names accepted on the left side of `@.route(...)`. +/// Flask convention covers `app`, `application`, plus blueprint +/// aliases (`bp`, `blueprint`, `router`). The check is permissive +/// because Phase 12 only uses the adapter to surface a route shape +/// for the harness — false positives are bounded by the +/// caller-supplied `summary` (the function must actually exist). +fn receiver_looks_like_flask(name: &str) -> bool { + let lower = name.to_ascii_lowercase(); + matches!( + lower.as_str(), + "app" | "application" | "bp" | "blueprint" | "router" + ) || lower.ends_with("_bp") + || lower.ends_with("_app") + || lower.ends_with("_blueprint") + || lower.ends_with("_router") +} + +/// Parse a single decorator node into (method, path). Returns `None` +/// when the decorator is not a Flask route decorator on a recognised +/// receiver. +fn decorator_route_shape(decorator: Node<'_>, bytes: &[u8]) -> Option<(HttpMethod, String)> { + let mut cur = decorator.walk(); + let expr = decorator.children(&mut cur).find(|c| c.kind() != "@")?; + let call = match expr.kind() { + "call" => expr, + _ => return None, + }; + let target = call.child_by_field_name("function")?; + let args = call.child_by_field_name("arguments")?; + if target.kind() != "attribute" { + return None; + } + let object = target.child_by_field_name("object")?; + let attr = target.child_by_field_name("attribute")?; + let object_text = object.utf8_text(bytes).ok()?; + let attr_text = attr.utf8_text(bytes).ok()?; + if !receiver_looks_like_flask(object_text) { + return None; + } + + let path = first_string_arg(args, bytes)?; + + if attr_text.eq_ignore_ascii_case("route") { + let method = methods_kwarg(args, bytes).unwrap_or(HttpMethod::GET); + return Some((method, path)); + } + let method = shortcut_method(attr_text)?; + Some((method, path)) +} + +fn first_string_arg(args: Node<'_>, bytes: &[u8]) -> Option { + let mut cur = args.walk(); + for c in args.named_children(&mut cur) { + if c.kind() == "string" { + return Some(strip_string_quotes(c.utf8_text(bytes).ok()?).to_owned()); + } + } + None +} + +fn strip_string_quotes(raw: &str) -> &str { + let t = raw.trim(); + let t = t.strip_prefix("b").unwrap_or(t); + let t = t.strip_prefix("r").unwrap_or(t); + let t = t.strip_prefix("u").unwrap_or(t); + t.trim_matches(['\'', '"']) +} + +fn methods_kwarg(args: Node<'_>, bytes: &[u8]) -> Option { + let mut cur = args.walk(); + for arg in args.children(&mut cur) { + if arg.kind() != "keyword_argument" { + continue; + } + let name = arg.child_by_field_name("name")?.utf8_text(bytes).ok()?; + if name != "methods" { + continue; + } + let value = arg.child_by_field_name("value")?; + let mut vc = value.walk(); + for child in value.named_children(&mut vc) { + if child.kind() == "string" { + let raw = strip_string_quotes(child.utf8_text(bytes).ok()?); + if let Some(m) = HttpMethod::from_ident(raw) { + return Some(m); + } + } + } + } + None +} + +impl FrameworkAdapter for PythonFlaskAdapter { + fn name(&self) -> &'static str { + ADAPTER_NAME + } + + fn lang(&self) -> Lang { + Lang::Python + } + + fn detect( + &self, + summary: &FuncSummary, + ast: Node<'_>, + file_bytes: &[u8], + ) -> Option { + if !source_imports_flask(file_bytes) { + return None; + } + let (func_node, decorated_node) = find_python_function(ast, file_bytes, &summary.name)?; + let decorated = decorated_node?; + let mut cur = decorated.walk(); + for d in decorated.children(&mut cur) { + if d.kind() != "decorator" { + continue; + } + if let Some((method, path)) = decorator_route_shape(d, file_bytes) { + let formals = function_formal_names(func_node, file_bytes); + let request_params = bind_path_params(&formals, &path); + return Some(FrameworkBinding { + adapter: ADAPTER_NAME.to_owned(), + kind: EntryKind::HttpRoute, + route: Some(RouteShape { method, path }), + request_params, + response_writer: None, + middleware: Vec::new(), + }); + } + } + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::dynamic::framework::ParamSource; + + fn parse(src: &[u8]) -> tree_sitter::Tree { + let mut parser = tree_sitter::Parser::new(); + let lang = tree_sitter::Language::from(tree_sitter_python::LANGUAGE); + parser.set_language(&lang).unwrap(); + parser.parse(src, None).unwrap() + } + + fn summary(name: &str) -> FuncSummary { + FuncSummary { + name: name.into(), + lang: "python".into(), + ..Default::default() + } + } + + #[test] + fn fires_on_app_route_with_get_default() { + let src: &[u8] = + b"from flask import Flask\napp = Flask(__name__)\n@app.route(\"/users\")\ndef list_users():\n return []\n"; + let tree = parse(src); + let binding = PythonFlaskAdapter + .detect(&summary("list_users"), tree.root_node(), src) + .expect("binding"); + assert_eq!(binding.adapter, "python-flask"); + assert_eq!(binding.kind, EntryKind::HttpRoute); + let route = binding.route.expect("route shape"); + assert_eq!(route.method, HttpMethod::GET); + assert_eq!(route.path, "/users"); + } + + #[test] + fn fires_on_app_route_with_methods_kwarg() { + let src: &[u8] = + b"from flask import Flask\napp = Flask(__name__)\n@app.route(\"/x\", methods=[\"POST\"])\ndef save(payload):\n return payload\n"; + let tree = parse(src); + let binding = PythonFlaskAdapter + .detect(&summary("save"), tree.root_node(), src) + .expect("binding"); + let route = binding.route.unwrap(); + assert_eq!(route.method, HttpMethod::POST); + assert_eq!(route.path, "/x"); + } + + #[test] + fn fires_on_verb_shortcut_post() { + let src: &[u8] = + b"from flask import Flask\napp = Flask(__name__)\n@app.post(\"/items\")\ndef create_item(payload):\n return payload\n"; + let tree = parse(src); + let binding = PythonFlaskAdapter + .detect(&summary("create_item"), tree.root_node(), src) + .expect("binding"); + assert_eq!(binding.route.unwrap().method, HttpMethod::POST); + } + + #[test] + fn fires_on_blueprint_route() { + let src: &[u8] = + b"from flask import Blueprint\nuser_bp = Blueprint('user_bp', __name__)\n@user_bp.route(\"/users/\")\ndef get_user(id):\n return id\n"; + let tree = parse(src); + let binding = PythonFlaskAdapter + .detect(&summary("get_user"), tree.root_node(), src) + .expect("binding"); + let route = binding.route.unwrap(); + assert_eq!(route.path, "/users/"); + assert!(binding + .request_params + .iter() + .any(|p| p.name == "id" && matches!(p.source, ParamSource::PathSegment(_)))); + } + + #[test] + fn binds_path_segment_and_implicit_formal() { + let src: &[u8] = + b"from flask import Flask\napp = Flask(__name__)\n@app.route(\"/users/\")\ndef show(id, extra=\"x\"):\n return id\n"; + let tree = parse(src); + let binding = PythonFlaskAdapter + .detect(&summary("show"), tree.root_node(), src) + .expect("binding"); + let id_binding = binding + .request_params + .iter() + .find(|p| p.name == "id") + .unwrap(); + assert!(matches!(id_binding.source, ParamSource::PathSegment(_))); + let extra_binding = binding + .request_params + .iter() + .find(|p| p.name == "extra") + .unwrap(); + assert!(matches!(extra_binding.source, ParamSource::QueryParam(_))); + } + + #[test] + fn skips_when_flask_not_imported() { + let src: &[u8] = b"def add(a, b):\n return a + b\n"; + let tree = parse(src); + assert!(PythonFlaskAdapter + .detect(&summary("add"), tree.root_node(), src) + .is_none()); + } + + #[test] + fn skips_when_function_has_no_decorator() { + let src: &[u8] = b"from flask import Flask\napp = Flask(__name__)\ndef helper(x):\n return x\n"; + let tree = parse(src); + assert!(PythonFlaskAdapter + .detect(&summary("helper"), tree.root_node(), src) + .is_none()); + } +} diff --git a/src/dynamic/framework/adapters/python_routes.rs b/src/dynamic/framework/adapters/python_routes.rs new file mode 100644 index 00000000..53fbf318 --- /dev/null +++ b/src/dynamic/framework/adapters/python_routes.rs @@ -0,0 +1,327 @@ +//! Shared Python-route adapter helpers (Phase 12 — Track L.10). +//! +//! The Flask / Django / FastAPI / Starlette adapters all need the same +//! handful of tree-sitter helpers: locate a `function_definition` by +//! name, peek at its parent `decorated_definition` for decorator data, +//! enumerate formal parameter names, and bind a path template's +//! placeholders to those formals. Centralising the helpers here keeps +//! the four adapters terse and lets every framework share the same +//! placeholder-binding semantics (so an unmatched formal becomes a +//! `QueryParam(name)` everywhere, not just in one adapter). + +use crate::dynamic::framework::{ParamBinding, ParamSource}; +use tree_sitter::Node; + +/// True when `bytes` carries any of the well-known Flask import +/// stanzas. Used by [`super::python_flask::PythonFlaskAdapter`] to +/// short-circuit non-Flask Python files before the AST walk. +pub fn source_imports_flask(bytes: &[u8]) -> bool { + contains_any( + bytes, + &[ + b"from flask", + b"import flask", + b"Flask(", + b"Blueprint(", + b"flask.Blueprint", + ], + ) +} + +/// True when `bytes` carries any of the well-known FastAPI import +/// stanzas. +pub fn source_imports_fastapi(bytes: &[u8]) -> bool { + contains_any( + bytes, + &[b"from fastapi", b"import fastapi", b"FastAPI(", b"APIRouter("], + ) +} + +/// True when `bytes` carries any of the well-known Django import +/// stanzas — including the `urls.py` `path(` / `re_path(` / `url(` +/// registration helpers that the Django adapter consults. +pub fn source_imports_django(bytes: &[u8]) -> bool { + contains_any( + bytes, + &[ + b"from django", + b"import django", + b"django.http", + b"django.urls", + b"django.views", + b"django.shortcuts", + b"urlpatterns", + ], + ) +} + +/// True when `bytes` carries any of the well-known Starlette import +/// stanzas. Excludes the FastAPI-only imports so the Starlette +/// adapter does not collide with FastAPI files that re-export +/// Starlette types. +pub fn source_imports_starlette(bytes: &[u8]) -> bool { + contains_any( + bytes, + &[ + b"from starlette", + b"import starlette", + b"Starlette(", + b"starlette.routing", + b"starlette.applications", + ], + ) +} + +fn contains_any(haystack: &[u8], needles: &[&[u8]]) -> bool { + needles + .iter() + .any(|n| haystack.windows(n.len()).any(|w| w == *n)) +} + +/// Find the `function_definition` node whose `name` field equals +/// `target`. Returns `(func_node, Option)` — +/// the decorated parent is `Some` when the function carries one or +/// more decorators. +pub fn find_python_function<'a>( + root: Node<'a>, + bytes: &[u8], + target: &str, +) -> Option<(Node<'a>, Option>)> { + walk(root, bytes, target) +} + +fn walk<'a>(node: Node<'a>, bytes: &[u8], target: &str) -> Option<(Node<'a>, Option>)> { + if node.kind() == "function_definition" { + if let Some(name) = node + .child_by_field_name("name") + .and_then(|n| n.utf8_text(bytes).ok()) + { + if name == target { + let decorated = node.parent().filter(|p| p.kind() == "decorated_definition"); + return Some((node, decorated)); + } + } + } + let mut cur = node.walk(); + for child in node.children(&mut cur) { + if let Some(found) = walk(child, bytes, target) { + return Some(found); + } + } + None +} + +/// Enumerate formal parameter names from a `function_definition` node. +/// Skips `self`/`cls` so class-based handler methods bind only the +/// adversary-controlled formals. +pub fn function_formal_names(func: Node<'_>, bytes: &[u8]) -> Vec { + let mut out = Vec::new(); + let Some(parameters) = func.child_by_field_name("parameters") else { + return out; + }; + let mut cur = parameters.walk(); + for child in parameters.named_children(&mut cur) { + if let Some(name) = parameter_name(child, bytes) { + if name == "self" || name == "cls" { + continue; + } + out.push(name); + } + } + out +} + +fn parameter_name(node: Node<'_>, bytes: &[u8]) -> Option { + match node.kind() { + "identifier" => node.utf8_text(bytes).ok().map(str::to_owned), + "default_parameter" + | "typed_parameter" + | "typed_default_parameter" + | "list_splat_pattern" + | "dictionary_splat_pattern" => { + // Each of these wraps either a plain identifier or another + // structure whose first identifier is the parameter name. + let mut cur = node.walk(); + for c in node.named_children(&mut cur) { + if c.kind() == "identifier" { + return c.utf8_text(bytes).ok().map(str::to_owned); + } + if let Some(n) = parameter_name(c, bytes) { + return Some(n); + } + } + None + } + _ => None, + } +} + +/// Bind formals to request slots given a route path template. +/// +/// Accepts both Flask-style placeholders (``, ``) and +/// FastAPI/Starlette/Django-style placeholders (`{id}`, ``). +/// A formal whose name matches a placeholder becomes a +/// [`ParamSource::PathSegment`]; an unmatched formal becomes a +/// [`ParamSource::QueryParam`] of the same name so downstream +/// harness emitters have a deterministic slot to populate. +pub fn bind_path_params(formals: &[String], path: &str) -> Vec { + let placeholders = extract_path_placeholders(path); + formals + .iter() + .enumerate() + .map(|(idx, name)| { + let source = if name == "request" || name == "req" { + ParamSource::Implicit + } else if placeholders.iter().any(|p| p == name) { + ParamSource::PathSegment(name.clone()) + } else { + ParamSource::QueryParam(name.clone()) + }; + ParamBinding { + index: idx, + name: name.clone(), + source, + } + }) + .collect() +} + +/// Extract placeholder names from a route path template. +/// +/// Supports three placeholder syntaxes: +/// - Flask: `/users/`, `/users/` → `id` +/// - FastAPI / Starlette: `/users/{id}` → `id` +/// - Django: ``, `` (same as Flask) plus regex +/// `(?P...)` capture groups. +/// +/// Names are deduplicated while preserving first-occurrence order +/// so a single placeholder reused across the path (or matched by +/// two scanners on the same span — e.g. `(?P...)`) does not +/// double-bind a formal. +pub fn extract_path_placeholders(path: &str) -> Vec { + let mut out: Vec = Vec::new(); + let mut push = |name: String| { + if !name.is_empty() && !out.iter().any(|n| n == &name) { + out.push(name); + } + }; + let bytes = path.as_bytes(); + let mut i = 0; + while i < bytes.len() { + match bytes[i] { + b'<' => { + // Skip the `<` that opens a Django named capture + // group `(?P...)` — the `(?P` scan below + // handles it. The two preceding bytes encode the + // `?P` marker. + let in_named_group = i >= 2 && &bytes[i - 2..i] == b"?P"; + if let Some(end) = bytes[i + 1..].iter().position(|&b| b == b'>') { + if !in_named_group { + let inner = &path[i + 1..i + 1 + end]; + let name = inner.rsplit_once(':').map(|(_, n)| n).unwrap_or(inner); + push(name.to_owned()); + } + i += end + 2; + continue; + } + } + b'{' => { + if let Some(end) = bytes[i + 1..].iter().position(|&b| b == b'}') { + let inner = &path[i + 1..i + 1 + end]; + let name = inner.split(':').next().unwrap_or(inner); + push(name.to_owned()); + i += end + 2; + continue; + } + } + _ => {} + } + i += 1; + } + let mut rest = path; + while let Some(pos) = rest.find("(?P<") { + let after = &rest[pos + 4..]; + if let Some(end) = after.find('>') { + push(after[..end].to_owned()); + rest = &after[end + 1..]; + } else { + break; + } + } + out +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse(src: &[u8]) -> tree_sitter::Tree { + let mut parser = tree_sitter::Parser::new(); + let lang = tree_sitter::Language::from(tree_sitter_python::LANGUAGE); + parser.set_language(&lang).unwrap(); + parser.parse(src, None).unwrap() + } + + #[test] + fn finds_decorated_function() { + let src: &[u8] = b"@dec\ndef target(a, b):\n return a + b\n"; + let tree = parse(src); + let (_func, decorated) = find_python_function(tree.root_node(), src, "target").unwrap(); + assert!(decorated.is_some()); + } + + #[test] + fn finds_function_without_decorator() { + let src: &[u8] = b"def target(a):\n return a\n"; + let tree = parse(src); + let (_func, decorated) = find_python_function(tree.root_node(), src, "target").unwrap(); + assert!(decorated.is_none()); + } + + #[test] + fn skips_self_and_cls() { + let src: &[u8] = b"class X:\n def m(self, a, b):\n return a + b\n"; + let tree = parse(src); + let (func, _) = find_python_function(tree.root_node(), src, "m").unwrap(); + let names = function_formal_names(func, src); + assert_eq!(names, vec!["a", "b"]); + } + + #[test] + fn extracts_flask_placeholders() { + let p = extract_path_placeholders("/users/"); + assert_eq!(p, vec!["id"]); + let p = extract_path_placeholders("/items//"); + assert_eq!(p, vec!["id", "slug"]); + } + + #[test] + fn extracts_fastapi_placeholders() { + let p = extract_path_placeholders("/users/{id}"); + assert_eq!(p, vec!["id"]); + let p = extract_path_placeholders("/items/{id:int}"); + assert_eq!(p, vec!["id"]); + } + + #[test] + fn extracts_django_regex_placeholders() { + let p = extract_path_placeholders(r"^/users/(?P\d+)/?$"); + assert_eq!(p, vec!["id"]); + } + + #[test] + fn binds_known_placeholder_as_path_segment() { + let formals = vec!["id".to_string(), "extra".to_string()]; + let bindings = bind_path_params(&formals, "/users/{id}"); + assert!(matches!(bindings[0].source, ParamSource::PathSegment(_))); + assert!(matches!(bindings[1].source, ParamSource::QueryParam(_))); + } + + #[test] + fn binds_request_as_implicit() { + let formals = vec!["request".to_string(), "id".to_string()]; + let bindings = bind_path_params(&formals, "/users/{id}"); + assert!(matches!(bindings[0].source, ParamSource::Implicit)); + assert!(matches!(bindings[1].source, ParamSource::PathSegment(_))); + } +} diff --git a/src/dynamic/framework/adapters/python_starlette.rs b/src/dynamic/framework/adapters/python_starlette.rs new file mode 100644 index 00000000..1d7b916d --- /dev/null +++ b/src/dynamic/framework/adapters/python_starlette.rs @@ -0,0 +1,265 @@ +//! Python Starlette [`super::super::FrameworkAdapter`] (Phase 12 — Track L.10). +//! +//! Recognises `Route("/path", endpoint=handler)` and +//! `Route("/path", handler)` registrations inside a Starlette +//! application file (`from starlette.routing import Route` / +//! `from starlette.applications import Starlette`). Detection walks +//! every `call` node in the AST so the order of declaration relative +//! to the handler does not matter. Methods are picked up from the +//! `methods=[...]` kwarg when present and default to `GET`. + +use crate::dynamic::framework::{ + FrameworkAdapter, FrameworkBinding, HttpMethod, RouteShape, +}; +use crate::evidence::EntryKind; +use crate::summary::FuncSummary; +use crate::symbol::Lang; +use tree_sitter::Node; + +use super::python_routes::{ + bind_path_params, find_python_function, function_formal_names, source_imports_starlette, +}; + +pub struct PythonStarletteAdapter; + +const ADAPTER_NAME: &str = "python-starlette"; + +/// Find a `Route("/path", endpoint=target)` or +/// `Route("/path", target)` call and return its `(method, path)`. +/// Returns `None` when no matching call is present. +fn route_registration_for( + root: Node<'_>, + bytes: &[u8], + target: &str, +) -> Option<(HttpMethod, String)> { + let mut hit: Option<(HttpMethod, String)> = None; + walk_routes(root, bytes, target, &mut hit); + hit +} + +fn walk_routes(node: Node<'_>, bytes: &[u8], target: &str, out: &mut Option<(HttpMethod, String)>) { + if out.is_some() { + return; + } + if node.kind() == "call" + && let Some(callee) = node + .child_by_field_name("function") + .and_then(|n| n.utf8_text(bytes).ok()) + { + let last = callee.rsplit_once('.').map(|(_, s)| s).unwrap_or(callee); + if matches!(last, "Route" | "WebSocketRoute") { + if let Some(args) = node.child_by_field_name("arguments") { + if let Some(path) = first_string_arg(args, bytes) { + if endpoint_references(args, bytes, target) { + let method = methods_kwarg(args, bytes).unwrap_or(HttpMethod::GET); + *out = Some((method, path)); + return; + } + } + } + } + } + let mut cur = node.walk(); + for child in node.children(&mut cur) { + walk_routes(child, bytes, target, out); + } +} + +fn first_string_arg(args: Node<'_>, bytes: &[u8]) -> Option { + let mut cur = args.walk(); + for c in args.named_children(&mut cur) { + if c.kind() == "string" { + let raw = c.utf8_text(bytes).ok()?; + return Some(strip_quotes(raw).to_owned()); + } + } + None +} + +fn strip_quotes(raw: &str) -> &str { + let t = raw.trim(); + let t = t.strip_prefix("b").unwrap_or(t); + let t = t.strip_prefix("r").unwrap_or(t); + let t = t.strip_prefix("u").unwrap_or(t); + t.trim_matches(['\'', '"']) +} + +fn endpoint_references(args: Node<'_>, bytes: &[u8], target: &str) -> bool { + let mut cur = args.walk(); + let mut seen_positional = 0usize; + for arg in args.named_children(&mut cur) { + if arg.kind() == "keyword_argument" { + let Some(name) = arg.child_by_field_name("name") else { + continue; + }; + let Ok(name_text) = name.utf8_text(bytes) else { + continue; + }; + if name_text == "endpoint" { + if let Some(value) = arg.child_by_field_name("value") { + if identifier_matches(value, bytes, target) { + return true; + } + } + } + } else { + seen_positional += 1; + // Second positional argument is the endpoint when no + // keyword form is used. + if seen_positional == 2 && identifier_matches(arg, bytes, target) { + return true; + } + } + } + false +} + +fn identifier_matches(node: Node<'_>, bytes: &[u8], target: &str) -> bool { + let Ok(text) = node.utf8_text(bytes) else { + return false; + }; + let trimmed = text.trim().trim_end_matches("()"); + let last = trimmed.rsplit_once('.').map(|(_, s)| s).unwrap_or(trimmed); + last == target || trimmed == target +} + +fn methods_kwarg(args: Node<'_>, bytes: &[u8]) -> Option { + let mut cur = args.walk(); + for arg in args.children(&mut cur) { + if arg.kind() != "keyword_argument" { + continue; + } + let Some(name) = arg + .child_by_field_name("name") + .and_then(|n| n.utf8_text(bytes).ok()) + else { + continue; + }; + if name != "methods" { + continue; + } + let Some(value) = arg.child_by_field_name("value") else { + continue; + }; + let mut vc = value.walk(); + for child in value.named_children(&mut vc) { + if child.kind() == "string" + && let Some(raw) = child.utf8_text(bytes).ok() + && let Some(m) = HttpMethod::from_ident(strip_quotes(raw)) + { + return Some(m); + } + } + } + None +} + +impl FrameworkAdapter for PythonStarletteAdapter { + fn name(&self) -> &'static str { + ADAPTER_NAME + } + + fn lang(&self) -> Lang { + Lang::Python + } + + fn detect( + &self, + summary: &FuncSummary, + ast: Node<'_>, + file_bytes: &[u8], + ) -> Option { + if !source_imports_starlette(file_bytes) { + return None; + } + let (func_node, _) = find_python_function(ast, file_bytes, &summary.name)?; + let (method, path) = route_registration_for(ast, file_bytes, &summary.name)?; + let formals = function_formal_names(func_node, file_bytes); + let request_params = bind_path_params(&formals, &path); + Some(FrameworkBinding { + adapter: ADAPTER_NAME.to_owned(), + kind: EntryKind::HttpRoute, + route: Some(RouteShape { method, path }), + request_params, + response_writer: None, + middleware: Vec::new(), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::dynamic::framework::ParamSource; + + fn parse(src: &[u8]) -> tree_sitter::Tree { + let mut parser = tree_sitter::Parser::new(); + let lang = tree_sitter::Language::from(tree_sitter_python::LANGUAGE); + parser.set_language(&lang).unwrap(); + parser.parse(src, None).unwrap() + } + + fn summary(name: &str) -> FuncSummary { + FuncSummary { + name: name.into(), + lang: "python".into(), + ..Default::default() + } + } + + #[test] + fn fires_on_route_with_keyword_endpoint() { + let src: &[u8] = b"from starlette.applications import Starlette\nfrom starlette.routing import Route\nasync def homepage(request):\n return None\napp = Starlette(routes=[Route(\"/\", endpoint=homepage)])\n"; + let tree = parse(src); + let binding = PythonStarletteAdapter + .detect(&summary("homepage"), tree.root_node(), src) + .unwrap(); + let route = binding.route.unwrap(); + assert_eq!(route.path, "/"); + assert_eq!(route.method, HttpMethod::GET); + } + + #[test] + fn fires_on_route_with_positional_endpoint() { + let src: &[u8] = b"from starlette.routing import Route\nasync def homepage(request):\n return None\nroutes = [Route(\"/items/{id}\", homepage)]\n"; + let tree = parse(src); + let binding = PythonStarletteAdapter + .detect(&summary("homepage"), tree.root_node(), src) + .unwrap(); + assert_eq!(binding.route.unwrap().path, "/items/{id}"); + } + + #[test] + fn picks_up_post_methods_kwarg() { + let src: &[u8] = b"from starlette.routing import Route\nasync def create(request):\n return None\nroutes = [Route(\"/items\", endpoint=create, methods=[\"POST\"])]\n"; + let tree = parse(src); + let binding = PythonStarletteAdapter + .detect(&summary("create"), tree.root_node(), src) + .unwrap(); + assert_eq!(binding.route.unwrap().method, HttpMethod::POST); + } + + #[test] + fn binds_request_as_implicit() { + let src: &[u8] = b"from starlette.routing import Route\nasync def homepage(request):\n return None\nroutes = [Route(\"/\", endpoint=homepage)]\n"; + let tree = parse(src); + let binding = PythonStarletteAdapter + .detect(&summary("homepage"), tree.root_node(), src) + .unwrap(); + let req = binding + .request_params + .iter() + .find(|p| p.name == "request") + .unwrap(); + assert!(matches!(req.source, ParamSource::Implicit)); + } + + #[test] + fn skips_when_starlette_not_imported() { + let src: &[u8] = b"def homepage(request):\n return None\n"; + let tree = parse(src); + assert!(PythonStarletteAdapter + .detect(&summary("homepage"), tree.root_node(), src) + .is_none()); + } +} diff --git a/src/dynamic/framework/mod.rs b/src/dynamic/framework/mod.rs index 7b10704c..8b97a092 100644 --- a/src/dynamic/framework/mod.rs +++ b/src/dynamic/framework/mod.rs @@ -214,15 +214,14 @@ mod tests { } #[test] - fn registry_baseline_after_phase_10() { - // Phase 10 (Track J.8) adds three prototype-pollution - // adapters (`pp-lodash-merge`, `pp-object-assign`, - // `pp-json-deep-assign`) to both the JavaScript and - // TypeScript slices. Java / Python / PHP each still carry - // the J.1..J.7 adapters (7 entries); Ruby still has 5; Go - // still has 3; Rust still has 2. JavaScript grows from 4 → - // 7; TypeScript grows from 0 → 3. C / Cpp stay empty. - for lang in [Lang::Java, Lang::Python, Lang::Php] { + fn registry_baseline_after_phase_12() { + // Phase 12 (Track L.10) adds four Python framework adapters + // (`python-django`, `python-fastapi`, `python-flask`, + // `python-starlette`) to the Python slice, growing it from + // 7 → 11. Java / PHP keep their 7-entry J.1..J.7 stacks; + // Ruby keeps 5; Go keeps 3; Rust keeps 2; JavaScript keeps 7; + // TypeScript keeps 3. C / Cpp stay empty. + for lang in [Lang::Java, Lang::Php] { let registered = registry::adapters_for(lang); assert_eq!( registered.len(), @@ -234,6 +233,15 @@ mod tests { assert_eq!(adapter.lang(), lang); } } + let python_registered = registry::adapters_for(Lang::Python); + assert_eq!( + python_registered.len(), + 11, + "Python must have J.1..J.7 (7) + L.10 Flask/Django/FastAPI/Starlette (4)", + ); + for adapter in python_registered { + assert_eq!(adapter.lang(), Lang::Python); + } let ruby_registered = registry::adapters_for(Lang::Ruby); assert_eq!( ruby_registered.len(), diff --git a/src/dynamic/framework/registry.rs b/src/dynamic/framework/registry.rs index 2a970278..88d2e7e3 100644 --- a/src/dynamic/framework/registry.rs +++ b/src/dynamic/framework/registry.rs @@ -76,8 +76,12 @@ static PHP: &[&dyn FrameworkAdapter] = &[ static PYTHON: &[&dyn FrameworkAdapter] = &[ &super::adapters::HeaderPythonAdapter, &super::adapters::LdapPythonAdapter, + &super::adapters::PythonDjangoAdapter, + &super::adapters::PythonFastApiAdapter, + &super::adapters::PythonFlaskAdapter, &super::adapters::PythonJinja2Adapter, &super::adapters::PythonPickleAdapter, + &super::adapters::PythonStarletteAdapter, &super::adapters::RedirectPythonAdapter, &super::adapters::XpathPythonAdapter, &super::adapters::XxePythonAdapter, diff --git a/src/dynamic/lang/python.rs b/src/dynamic/lang/python.rs index ebb79009..e8e00a61 100644 --- a/src/dynamic/lang/python.rs +++ b/src/dynamic/lang/python.rs @@ -136,6 +136,12 @@ pub enum PythonShape { /// FastAPI `@app.get` / `@router.post` / etc. Harness uses /// `starlette.testclient.TestClient` to drive the route. FastApiRoute, + /// Pure Starlette application (`Starlette(routes=[Route(...)])`). + /// Harness uses `starlette.testclient.TestClient` to drive the + /// route. Distinguished from [`Self::FastApiRoute`] because the + /// app resolver looks up `starlette.applications.Starlette` + /// instances rather than `fastapi.FastAPI` instances. + StarletteRoute, /// Django view (function or `View`/`APIView` method). Harness /// instantiates a `django.test.RequestFactory` and calls the view. DjangoView, @@ -180,6 +186,16 @@ impl PythonShape { source, &["from fastapi", "import fastapi", "FastAPI(", "APIRouter("], ); + let has_starlette = source_has_marker( + source, + &[ + "from starlette", + "import starlette", + "Starlette(", + "starlette.routing", + "starlette.applications", + ], + ); let has_django = source_has_marker( source, &[ @@ -201,6 +217,9 @@ impl PythonShape { if has_django { return Self::DjangoView; } + if has_starlette { + return Self::StarletteRoute; + } if has_flask { return Self::FlaskRoute; } @@ -1265,6 +1284,10 @@ fn extra_files_for_shape(shape: PythonShape) -> Vec<(String, String)> { "requirements.txt".to_owned(), "fastapi\nhttpx\n".to_owned(), )], + PythonShape::StarletteRoute => vec![( + "requirements.txt".to_owned(), + "starlette\nhttpx\n".to_owned(), + )], PythonShape::DjangoView => vec![("requirements.txt".to_owned(), "Django\n".to_owned())], PythonShape::CeleryTask => vec![("requirements.txt".to_owned(), "celery\n".to_owned())], // Generic / CLI / Pytest / Async use the stdlib only. @@ -1282,6 +1305,7 @@ fn generate_for_shape(spec: &HarnessSpec, shape: PythonShape) -> String { PythonShape::CeleryTask => emit_celery(spec), PythonShape::FlaskRoute => emit_flask(spec), PythonShape::FastApiRoute => emit_fastapi(spec), + PythonShape::StarletteRoute => emit_starlette(spec), PythonShape::DjangoView => emit_django(spec), }; let postamble = harness_postamble(); @@ -1645,6 +1669,81 @@ except Exception as _e: ) } +fn emit_starlette(spec: &HarnessSpec) -> String { + let entry_fn = &spec.entry_name; + let (method, query_name, body_kind) = resolve_http_payload(&spec.payload_slot); + format!( + r#"# Shape: Starlette route — dispatch via starlette.testclient.TestClient. +def _nyx_resolve_starlette_app(mod): + try: + from starlette.applications import Starlette + except ImportError: + return None + for n in ("app", "application"): + v = getattr(mod, n, None) + if isinstance(v, Starlette): + return v + for attr in dir(mod): + val = getattr(mod, attr, None) + if isinstance(val, Starlette): + return val + return None + +_app = _nyx_resolve_starlette_app(_entry_mod) +if _app is None: + print("NYX_STARLETTE_APP_NOT_FOUND", file=sys.stderr, flush=True) + sys.exit(78) + +try: + from starlette.testclient import TestClient +except ImportError: + print("NYX_STARLETTE_TESTCLIENT_MISSING", file=sys.stderr, flush=True) + sys.exit(79) + +_path = None +for _r in _app.routes: + _name = getattr(_r, "name", None) + _endpoint = getattr(_r, "endpoint", None) + _endpoint_name = getattr(_endpoint, "__name__", None) + if _name == {entry_fn:?} or _endpoint_name == {entry_fn:?}: + _path = getattr(_r, "path", None) + break +if _path is None and _app.routes: + _path = getattr(_app.routes[0], "path", None) +if _path is None: + print("NYX_STARLETTE_ROUTE_NOT_FOUND", file=sys.stderr, flush=True) + sys.exit(80) + +import re +if {body_kind:?} == "path": + _path = re.sub(r"\{{[^}}]+\}}", payload, _path, count=1) +else: + _path = re.sub(r"\{{[^}}]+\}}", "x", _path) + +_client = TestClient(_app, raise_server_exceptions=False) +_method = {method:?} +_query = {{}} +_body = None +if {body_kind:?} == "query": + _query[{query_name:?}] = payload +elif {body_kind:?} == "body": + _body = payload +elif {body_kind:?} == "env": + os.environ[{query_name:?}] = payload +try: + _resp = _client.request(_method, _path, params=_query, content=_body) + try: + print(_resp.text, flush=True) + except Exception: + pass +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) +"# + ) +} + fn emit_django(spec: &HarnessSpec) -> String { let entry_fn = &spec.entry_name; let (method, query_name, body_kind) = resolve_http_payload(&spec.payload_slot); @@ -1945,6 +2044,13 @@ mod tests { assert_eq!(PythonShape::detect(&spec, src), PythonShape::DjangoView); } + #[test] + fn shape_detect_starlette() { + let src = "from starlette.applications import Starlette\nfrom starlette.routing import Route\nasync def index(request): pass\napp = Starlette(routes=[Route('/', index)])\n"; + let spec = make_spec_with(EntryKind::HttpRoute, "index"); + assert_eq!(PythonShape::detect(&spec, src), PythonShape::StarletteRoute); + } + #[test] fn shape_detect_cli() { let src = "def main():\n pass\nif __name__ == \"__main__\":\n main()\n"; @@ -2059,6 +2165,23 @@ mod tests { .any(|(p, c)| p == "requirements.txt" && c.contains("fastapi") && c.contains("httpx"))); } + #[test] + fn starlette_shape_emits_test_client() { + let spec = make_spec_with(EntryKind::HttpRoute, "homepage"); + let src = generate_for_shape(&spec, PythonShape::StarletteRoute); + assert!(src.contains("starlette.testclient")); + assert!(src.contains("TestClient")); + assert!(src.contains("Starlette")); + } + + #[test] + fn extra_files_starlette_pins_httpx() { + let extras = extra_files_for_shape(PythonShape::StarletteRoute); + assert!(extras.iter().any( + |(p, c)| p == "requirements.txt" && c.contains("starlette") && c.contains("httpx") + )); + } + fn make_spec_with(kind: EntryKind, name: &str) -> HarnessSpec { let mut s = make_spec(PayloadSlot::Param(0)); s.entry_kind = kind; diff --git a/tests/dynamic_fixtures/python_frameworks/django/benign.py b/tests/dynamic_fixtures/python_frameworks/django/benign.py new file mode 100644 index 00000000..1a104437 --- /dev/null +++ b/tests/dynamic_fixtures/python_frameworks/django/benign.py @@ -0,0 +1,22 @@ +"""Phase 12 (Track L.10) — Django CMDI benign fixture. + +`run_cmd(request)` reads `request.GET["cmd"]` but rejects anything +outside an allowlist before invoking `subprocess.run` with a fixed +argv, so the sink call is unreachable for attacker-controlled values. +""" +import subprocess +from django.http import HttpResponse +from django.urls import path + +_ALLOW = {"status", "uptime", "version"} + + +def run_cmd(request): + cmd = request.GET.get("cmd", "") + if cmd not in _ALLOW: + return HttpResponse("rejected", status=400) + subprocess.run(["/usr/bin/echo", cmd], check=False) + return HttpResponse("ok") + + +urlpatterns = [path("run/", run_cmd)] diff --git a/tests/dynamic_fixtures/python_frameworks/django/vuln.py b/tests/dynamic_fixtures/python_frameworks/django/vuln.py new file mode 100644 index 00000000..6aec9aa2 --- /dev/null +++ b/tests/dynamic_fixtures/python_frameworks/django/vuln.py @@ -0,0 +1,18 @@ +"""Phase 12 (Track L.10) — Django CMDI vuln fixture. + +`run_cmd(request)` reads `request.GET["cmd"]` and pipes it straight to +`os.system`. Adapter binding: `path("run/", run_cmd)` registration with +`cmd` flowing through `request.GET`. +""" +import os +from django.http import HttpResponse +from django.urls import path + + +def run_cmd(request): + cmd = request.GET.get("cmd", "") + os.system(cmd) + return HttpResponse("ok") + + +urlpatterns = [path("run/", run_cmd)] diff --git a/tests/dynamic_fixtures/python_frameworks/fastapi/benign.py b/tests/dynamic_fixtures/python_frameworks/fastapi/benign.py new file mode 100644 index 00000000..d4bc3f29 --- /dev/null +++ b/tests/dynamic_fixtures/python_frameworks/fastapi/benign.py @@ -0,0 +1,20 @@ +"""Phase 12 (Track L.10) — FastAPI CMDI benign fixture. + +`GET /run?cmd=<...>` rejects anything outside an allowlist before +invoking `subprocess.run` with a fixed argv, so the sink call is +unreachable for attacker-controlled values. +""" +import subprocess +from fastapi import FastAPI + +app = FastAPI() + +_ALLOW = {"status", "uptime", "version"} + + +@app.get("/run") +def run_cmd(cmd: str = ""): + if cmd not in _ALLOW: + return {"rejected": True} + subprocess.run(["/usr/bin/echo", cmd], check=False) + return {"ok": True} diff --git a/tests/dynamic_fixtures/python_frameworks/fastapi/vuln.py b/tests/dynamic_fixtures/python_frameworks/fastapi/vuln.py new file mode 100644 index 00000000..65fdc981 --- /dev/null +++ b/tests/dynamic_fixtures/python_frameworks/fastapi/vuln.py @@ -0,0 +1,16 @@ +"""Phase 12 (Track L.10) — FastAPI CMDI vuln fixture. + +`GET /run?cmd=<...>` forwards the `cmd` query parameter straight into +`os.system`. Adapter binding: `@app.get("/run")` with `cmd` flowing +through the function formal. +""" +import os +from fastapi import FastAPI + +app = FastAPI() + + +@app.get("/run") +def run_cmd(cmd: str = ""): + os.system(cmd) + return {"ok": True} diff --git a/tests/dynamic_fixtures/python_frameworks/flask/benign.py b/tests/dynamic_fixtures/python_frameworks/flask/benign.py new file mode 100644 index 00000000..339ff07b --- /dev/null +++ b/tests/dynamic_fixtures/python_frameworks/flask/benign.py @@ -0,0 +1,21 @@ +"""Phase 12 (Track L.10) — Flask CMDI benign fixture. + +The `/run` route accepts a `cmd` query parameter but rejects everything +outside an allowlist before invoking `subprocess.run` with a fixed argv, +so the sink call is unreachable for attacker-controlled values. +""" +import subprocess +from flask import Flask, request + +app = Flask(__name__) + +_ALLOW = {"status", "uptime", "version"} + + +@app.route("/run", methods=["GET"]) +def run_cmd(): + cmd = request.args.get("cmd", "") + if cmd not in _ALLOW: + return "rejected", 400 + subprocess.run(["/usr/bin/echo", cmd], check=False) + return "ok" diff --git a/tests/dynamic_fixtures/python_frameworks/flask/vuln.py b/tests/dynamic_fixtures/python_frameworks/flask/vuln.py new file mode 100644 index 00000000..95e54ac5 --- /dev/null +++ b/tests/dynamic_fixtures/python_frameworks/flask/vuln.py @@ -0,0 +1,18 @@ +"""Phase 12 (Track L.10) — Flask CMDI vuln fixture. + +The `/run` route forwards a `cmd` query parameter straight into +`os.system`, so any attacker who reaches the route can execute +arbitrary shell. Adapter binding: `@app.route("/run", methods=["GET"])` +with `cmd` flowing through `request.args.get`. +""" +import os +from flask import Flask, request + +app = Flask(__name__) + + +@app.route("/run", methods=["GET"]) +def run_cmd(): + cmd = request.args.get("cmd", "") + os.system(cmd) + return "ok" diff --git a/tests/dynamic_fixtures/python_frameworks/starlette/benign.py b/tests/dynamic_fixtures/python_frameworks/starlette/benign.py new file mode 100644 index 00000000..3704171e --- /dev/null +++ b/tests/dynamic_fixtures/python_frameworks/starlette/benign.py @@ -0,0 +1,23 @@ +"""Phase 12 (Track L.10) — Starlette CMDI benign fixture. + +`run_cmd(request)` reads the `cmd` query parameter but rejects anything +outside an allowlist before invoking `subprocess.run` with a fixed +argv, so the sink call is unreachable for attacker-controlled values. +""" +import subprocess +from starlette.applications import Starlette +from starlette.responses import PlainTextResponse +from starlette.routing import Route + +_ALLOW = {"status", "uptime", "version"} + + +async def run_cmd(request): + cmd = request.query_params.get("cmd", "") + if cmd not in _ALLOW: + return PlainTextResponse("rejected", status_code=400) + subprocess.run(["/usr/bin/echo", cmd], check=False) + return PlainTextResponse("ok") + + +app = Starlette(routes=[Route("/run", endpoint=run_cmd)]) diff --git a/tests/dynamic_fixtures/python_frameworks/starlette/vuln.py b/tests/dynamic_fixtures/python_frameworks/starlette/vuln.py new file mode 100644 index 00000000..9398fb09 --- /dev/null +++ b/tests/dynamic_fixtures/python_frameworks/starlette/vuln.py @@ -0,0 +1,19 @@ +"""Phase 12 (Track L.10) — Starlette CMDI vuln fixture. + +`run_cmd(request)` reads the `cmd` query parameter and pipes it +straight to `os.system`. Adapter binding: `Route("/run", endpoint=run_cmd)` +registration with `cmd` flowing through `request.query_params`. +""" +import os +from starlette.applications import Starlette +from starlette.responses import PlainTextResponse +from starlette.routing import Route + + +async def run_cmd(request): + cmd = request.query_params.get("cmd", "") + os.system(cmd) + return PlainTextResponse("ok") + + +app = Starlette(routes=[Route("/run", endpoint=run_cmd)]) diff --git a/tests/python_frameworks_corpus.rs b/tests/python_frameworks_corpus.rs new file mode 100644 index 00000000..e684f19d --- /dev/null +++ b/tests/python_frameworks_corpus.rs @@ -0,0 +1,170 @@ +//! Phase 12 (Track L.10) — Python framework adapter integration tests. +//! +//! Each test exercises `detect_binding` end-to-end against a fixture +//! file under `tests/dynamic_fixtures/python_frameworks/`, asserting +//! that the right adapter fires, the binding carries +//! `EntryKind::HttpRoute`, and the `RouteShape` + per-formal +//! `request_params` match the brief's contract. Benign fixtures +//! must produce the same adapter binding shape as the vuln fixtures +//! — the adapter only models the route, the differential outcome of +//! a verifier run is what distinguishes the two. + +#![cfg(feature = "dynamic")] + +use nyx_scanner::dynamic::framework::{detect_binding, HttpMethod, ParamSource}; +use nyx_scanner::evidence::EntryKind; +use nyx_scanner::summary::FuncSummary; +use nyx_scanner::symbol::Lang; + +fn parse_python(src: &[u8]) -> tree_sitter::Tree { + let mut parser = tree_sitter::Parser::new(); + let lang = tree_sitter::Language::from(tree_sitter_python::LANGUAGE); + parser.set_language(&lang).unwrap(); + parser.parse(src, None).unwrap() +} + +fn summary_for(name: &str, file: &str) -> FuncSummary { + FuncSummary { + name: name.into(), + file_path: file.into(), + lang: "python".into(), + ..Default::default() + } +} + +#[test] +fn flask_vuln_fixture_binds_route() { + let path = "tests/dynamic_fixtures/python_frameworks/flask/vuln.py"; + let bytes = std::fs::read(path).expect("flask vuln fixture exists"); + let tree = parse_python(&bytes); + let summary = summary_for("run_cmd", path); + let binding = detect_binding(&summary, tree.root_node(), &bytes, Lang::Python) + .expect("flask adapter must bind"); + assert_eq!(binding.adapter, "python-flask"); + assert_eq!(binding.kind, EntryKind::HttpRoute); + let route = binding.route.as_ref().expect("route"); + assert_eq!(route.path, "/run"); + assert_eq!(route.method, HttpMethod::GET); +} + +#[test] +fn flask_benign_fixture_binds_same_route_shape() { + let path = "tests/dynamic_fixtures/python_frameworks/flask/benign.py"; + let bytes = std::fs::read(path).expect("flask benign fixture exists"); + let tree = parse_python(&bytes); + let summary = summary_for("run_cmd", path); + let binding = detect_binding(&summary, tree.root_node(), &bytes, Lang::Python) + .expect("flask adapter must bind benign fixture"); + assert_eq!(binding.adapter, "python-flask"); + let route = binding.route.as_ref().expect("route"); + assert_eq!(route.path, "/run"); + assert_eq!(route.method, HttpMethod::GET); +} + +#[test] +fn fastapi_vuln_fixture_binds_route_with_query_param() { + let path = "tests/dynamic_fixtures/python_frameworks/fastapi/vuln.py"; + let bytes = std::fs::read(path).expect("fastapi vuln fixture exists"); + let tree = parse_python(&bytes); + let summary = summary_for("run_cmd", path); + let binding = detect_binding(&summary, tree.root_node(), &bytes, Lang::Python) + .expect("fastapi adapter must bind"); + assert_eq!(binding.adapter, "python-fastapi"); + let route = binding.route.as_ref().expect("route"); + assert_eq!(route.path, "/run"); + assert_eq!(route.method, HttpMethod::GET); + let cmd_binding = binding + .request_params + .iter() + .find(|p| p.name == "cmd") + .expect("cmd formal"); + assert!(matches!(cmd_binding.source, ParamSource::QueryParam(_))); +} + +#[test] +fn fastapi_benign_fixture_binds_same_route_shape() { + let path = "tests/dynamic_fixtures/python_frameworks/fastapi/benign.py"; + let bytes = std::fs::read(path).expect("fastapi benign fixture exists"); + let tree = parse_python(&bytes); + let summary = summary_for("run_cmd", path); + let binding = detect_binding(&summary, tree.root_node(), &bytes, Lang::Python) + .expect("fastapi adapter must bind benign fixture"); + assert_eq!(binding.adapter, "python-fastapi"); + let route = binding.route.as_ref().expect("route"); + assert_eq!(route.path, "/run"); + assert_eq!(route.method, HttpMethod::GET); +} + +#[test] +fn django_vuln_fixture_binds_route_via_urlconf() { + let path = "tests/dynamic_fixtures/python_frameworks/django/vuln.py"; + let bytes = std::fs::read(path).expect("django vuln fixture exists"); + let tree = parse_python(&bytes); + let summary = summary_for("run_cmd", path); + let binding = detect_binding(&summary, tree.root_node(), &bytes, Lang::Python) + .expect("django adapter must bind"); + assert_eq!(binding.adapter, "python-django"); + let route = binding.route.as_ref().expect("route"); + assert_eq!(route.path, "run/"); + let request_binding = binding + .request_params + .iter() + .find(|p| p.name == "request") + .expect("request formal"); + assert!(matches!(request_binding.source, ParamSource::Implicit)); +} + +#[test] +fn django_benign_fixture_binds_same_route_shape() { + let path = "tests/dynamic_fixtures/python_frameworks/django/benign.py"; + let bytes = std::fs::read(path).expect("django benign fixture exists"); + let tree = parse_python(&bytes); + let summary = summary_for("run_cmd", path); + let binding = detect_binding(&summary, tree.root_node(), &bytes, Lang::Python) + .expect("django adapter must bind benign fixture"); + assert_eq!(binding.adapter, "python-django"); + assert_eq!(binding.route.as_ref().unwrap().path, "run/"); +} + +#[test] +fn starlette_vuln_fixture_binds_route_via_routes_list() { + let path = "tests/dynamic_fixtures/python_frameworks/starlette/vuln.py"; + let bytes = std::fs::read(path).expect("starlette vuln fixture exists"); + let tree = parse_python(&bytes); + let summary = summary_for("run_cmd", path); + let binding = detect_binding(&summary, tree.root_node(), &bytes, Lang::Python) + .expect("starlette adapter must bind"); + assert_eq!(binding.adapter, "python-starlette"); + let route = binding.route.as_ref().expect("route"); + assert_eq!(route.path, "/run"); + assert_eq!(route.method, HttpMethod::GET); +} + +#[test] +fn starlette_benign_fixture_binds_same_route_shape() { + let path = "tests/dynamic_fixtures/python_frameworks/starlette/benign.py"; + let bytes = std::fs::read(path).expect("starlette benign fixture exists"); + let tree = parse_python(&bytes); + let summary = summary_for("run_cmd", path); + let binding = detect_binding(&summary, tree.root_node(), &bytes, Lang::Python) + .expect("starlette adapter must bind benign fixture"); + assert_eq!(binding.adapter, "python-starlette"); + let route = binding.route.as_ref().expect("route"); + assert_eq!(route.path, "/run"); +} + +#[test] +fn fastapi_adapter_runs_before_starlette_for_fastapi_files() { + // Regression: a FastAPI file imports starlette transitively via + // `from starlette.responses import ...`, so the Starlette adapter + // would otherwise fire for it. Registration order + // (python-fastapi before python-starlette alphabetically) + + // the FastAPI adapter's tighter import check protect against + // mis-routing. + let src: &[u8] = b"from fastapi import FastAPI\nfrom starlette.responses import PlainTextResponse\napp = FastAPI()\n@app.get(\"/x\")\ndef handler(q: str = \"\"):\n return q\n"; + let tree = parse_python(src); + let summary = summary_for("handler", "phantom.py"); + let binding = + detect_binding(&summary, tree.root_node(), src, Lang::Python).expect("adapter fires"); + assert_eq!(binding.adapter, "python-fastapi"); +}