From aaa1fd7edee5e059d902ba2009cfa04e4fe2bb12 Mon Sep 17 00:00:00 2001 From: elipeter Date: Sat, 23 May 2026 14:02:18 -0500 Subject: [PATCH] refactor(dynamic): enhance SQS framework binding logic and auto-detect broker dependencies in Python/JavaScript --- src/dynamic/framework/adapters/sqs_node.rs | 135 ++++++++++++++++++--- src/dynamic/lang/js_shared.rs | 118 +++++++++++++++++- src/dynamic/lang/python.rs | 106 +++++++++++++++- 3 files changed, 340 insertions(+), 19 deletions(-) diff --git a/src/dynamic/framework/adapters/sqs_node.rs b/src/dynamic/framework/adapters/sqs_node.rs index dd891b92..477d5c1b 100644 --- a/src/dynamic/framework/adapters/sqs_node.rs +++ b/src/dynamic/framework/adapters/sqs_node.rs @@ -4,6 +4,7 @@ use crate::dynamic::framework::{FrameworkAdapter, FrameworkBinding}; use crate::evidence::EntryKind; use crate::summary::FuncSummary; +use crate::summary::ssa_summary::SsaFuncSummary; use crate::symbol::Lang; pub struct SqsNodeAdapter; @@ -58,32 +59,82 @@ impl FrameworkAdapter for SqsNodeAdapter { fn detect( &self, summary: &FuncSummary, - _ast: tree_sitter::Node<'_>, + ast: tree_sitter::Node<'_>, file_bytes: &[u8], ) -> Option { - let matches_call = super::any_callee_matches(summary, callee_is_sqs); - let matches_source = source_imports_sqs(file_bytes); - if matches_call || matches_source { - Some(FrameworkBinding { - adapter: ADAPTER_NAME.to_owned(), - kind: EntryKind::MessageHandler { - queue: extract_queue(file_bytes), - message_schema: None, - }, - route: None, - request_params: Vec::new(), - response_writer: None, - middleware: Vec::new(), - }) - } else { - None + detect_sqs_node(summary, None, ast, file_bytes) + } + + fn detect_with_context( + &self, + summary: &FuncSummary, + ssa_summary: Option<&SsaFuncSummary>, + ast: tree_sitter::Node<'_>, + file_bytes: &[u8], + ) -> Option { + detect_sqs_node(summary, ssa_summary, ast, file_bytes) + } +} + +fn detect_sqs_node( + summary: &FuncSummary, + ssa_summary: Option<&SsaFuncSummary>, + _ast: tree_sitter::Node<'_>, + file_bytes: &[u8], +) -> Option { + let matches_call = super::any_callee_matches(summary, callee_is_sqs); + let matches_source = source_imports_sqs(file_bytes); + if !(matches_call || matches_source) { + return None; + } + if !sqs_receiver_facts_allow(summary, ssa_summary) { + return None; + } + Some(FrameworkBinding { + adapter: ADAPTER_NAME.to_owned(), + kind: EntryKind::MessageHandler { + queue: extract_queue(file_bytes), + message_schema: None, + }, + route: None, + request_params: Vec::new(), + response_writer: None, + middleware: Vec::new(), + }) +} + +fn sqs_receiver_facts_allow(summary: &FuncSummary, ssa_summary: Option<&SsaFuncSummary>) -> bool { + let Some(ssa_summary) = ssa_summary else { + return true; + }; + for site in &summary.callees { + if !callee_is_sqs(&site.name) || site.receiver.is_none() { + continue; + } + let Some(container) = ssa_summary + .typed_call_receivers + .iter() + .find(|(ord, _)| *ord == site.ordinal) + .map(|(_, container)| container.as_str()) + else { + continue; + }; + if !typed_container_allows_sqs(container) { + return false; } } + true +} + +fn typed_container_allows_sqs(container: &str) -> bool { + let lc = container.to_ascii_lowercase(); + lc.contains("sqs") || lc.contains("queue") || lc == "consumer" } #[cfg(test)] mod tests { use super::*; + use crate::summary::CalleeSite; fn parse_js(src: &[u8]) -> tree_sitter::Tree { let mut parser = tree_sitter::Parser::new(); @@ -109,4 +160,54 @@ mod tests { assert_eq!(queue, "http://localhost/q"); } } + + #[test] + fn ssa_receiver_type_rejects_non_sqs_send_collision() { + let src: &[u8] = b"const { SQSClient } = require('@aws-sdk/client-sqs');\n\ + function handler(env) {}\n\ + Promise.resolve().send(handler);\n"; + let tree = parse_js(src); + let mut summary = FuncSummary { + name: "handler".into(), + ..Default::default() + }; + summary.callees.push(CalleeSite { + name: "promise.send".to_owned(), + receiver: Some("promise".to_owned()), + ordinal: 0, + ..Default::default() + }); + let mut ssa = SsaFuncSummary::default(); + ssa.typed_call_receivers.push((0, "Promise".to_owned())); + assert!( + SqsNodeAdapter + .detect_with_context(&summary, Some(&ssa), tree.root_node(), src) + .is_none() + ); + } + + #[test] + fn ssa_receiver_type_keeps_sqs_client_send() { + let src: &[u8] = b"const { SQSClient } = require('@aws-sdk/client-sqs');\n\ + function handler(env) {}\n\ + client.send(handler);\n"; + let tree = parse_js(src); + let mut summary = FuncSummary { + name: "handler".into(), + ..Default::default() + }; + summary.callees.push(CalleeSite { + name: "client.send".to_owned(), + receiver: Some("client".to_owned()), + ordinal: 0, + ..Default::default() + }); + let mut ssa = SsaFuncSummary::default(); + ssa.typed_call_receivers.push((0, "SQSClient".to_owned())); + assert!( + SqsNodeAdapter + .detect_with_context(&summary, Some(&ssa), tree.root_node(), src) + .is_some() + ); + } } diff --git a/src/dynamic/lang/js_shared.rs b/src/dynamic/lang/js_shared.rs index ffa4ed9e..516dd4ef 100644 --- a/src/dynamic/lang/js_shared.rs +++ b/src/dynamic/lang/js_shared.rs @@ -894,7 +894,7 @@ _broker.subscribe({queue:?}, async (envelope) => {{ source: body, filename: "harness.js".to_owned(), command: vec!["node".to_owned(), "harness.js".to_owned()], - extra_files: Vec::new(), + extra_files: message_handler_dependency_files(spec), entry_subpath: Some(entry_subpath.to_owned()), } } @@ -2446,6 +2446,65 @@ fn read_entry_source(entry_file: &str) -> String { String::new() } +fn message_handler_dependency_files(spec: &HarnessSpec) -> Vec<(String, String)> { + if spec.expected_cap != crate::labels::Cap::CODE_EXEC { + return Vec::new(); + } + let source = read_entry_source(&spec.entry_file); + let deps = js_message_handler_deps(&source); + if deps.is_empty() { + return Vec::new(); + } + vec![ + ( + "package.json".to_owned(), + package_json_multi("nyx-harness-message-handler", &deps), + ), + ( + "package-lock.json".to_owned(), + package_lock_skeleton("nyx-harness-message-handler"), + ), + ] +} + +fn js_message_handler_deps(source: &str) -> Vec<(&'static str, &'static str)> { + let mut deps = Vec::new(); + for raw_line in source.lines() { + let line = raw_line.trim_start(); + if line.starts_with("//") || line.starts_with("/*") || line.starts_with('*') { + continue; + } + if (line.contains("= require('@aws-sdk/client-sqs')") + || line.contains("= require(\"@aws-sdk/client-sqs\")") + || line.starts_with("import ") + && (line.contains(" from '@aws-sdk/client-sqs'") + || line.contains(" from \"@aws-sdk/client-sqs\""))) + && !deps.iter().any(|(name, _)| *name == "@aws-sdk/client-sqs") + { + deps.push(("@aws-sdk/client-sqs", "^3.583.0")); + } + if (line.contains("= require('aws-sdk/clients/sqs')") + || line.contains("= require(\"aws-sdk/clients/sqs\")") + || line.starts_with("import ") + && (line.contains(" from 'aws-sdk/clients/sqs'") + || line.contains(" from \"aws-sdk/clients/sqs\""))) + && !deps.iter().any(|(name, _)| *name == "aws-sdk") + { + deps.push(("aws-sdk", "^2.1692.0")); + } + if (line.contains("= require('sqs-consumer')") + || line.contains("= require(\"sqs-consumer\")") + || line.starts_with("import ") + && (line.contains(" from 'sqs-consumer'") + || line.contains(" from \"sqs-consumer\""))) + && !deps.iter().any(|(name, _)| *name == "sqs-consumer") + { + deps.push(("sqs-consumer", "^11.5.0")); + } + } + deps +} + /// File name the harness's `require` / `import()` will reach for. /// /// Both JS and TS fixtures stage their entry source at `workdir/entry.js` @@ -3340,6 +3399,63 @@ mod tests { assert!(extras.is_empty()); } + #[test] + fn message_handler_deps_ignore_string_markers() { + let src = r#" +const _markerRequire = "require('sqs-consumer')"; +const _markerImport = "@aws-sdk/client-sqs"; +"#; + assert!(js_message_handler_deps(src).is_empty()); + } + + #[test] + fn message_handler_deps_detect_real_sqs_imports() { + let src = r#" +const { Consumer } = require('sqs-consumer'); +const { SQSClient } = require('@aws-sdk/client-sqs'); +const SQS = require('aws-sdk/clients/sqs'); +"#; + let deps = js_message_handler_deps(src); + assert!(deps.iter().any(|(name, _)| *name == "sqs-consumer")); + assert!(deps.iter().any(|(name, _)| *name == "@aws-sdk/client-sqs")); + assert!(deps.iter().any(|(name, _)| *name == "aws-sdk")); + } + + #[test] + fn emit_message_handler_stages_package_json_for_hard_imports() { + let dir = std::env::temp_dir().join("nyx_message_handler_node_deps"); + let _ = std::fs::remove_dir_all(&dir); + std::fs::create_dir_all(&dir).unwrap(); + let entry = dir.join("entry.js"); + std::fs::write( + &entry, + "const { Consumer } = require('sqs-consumer');\n\ + function handler(envelope) { return envelope.Body; }\n\ + module.exports = { handler };\n", + ) + .unwrap(); + + let mut spec = make_spec( + EntryKind::MessageHandler { + queue: "jobs".to_owned(), + message_schema: None, + }, + "handler", + PayloadSlot::Param(0), + ); + spec.entry_file = entry.to_string_lossy().into_owned(); + + let h = emit(&spec, false).unwrap(); + assert!( + h.extra_files + .iter() + .any(|(p, c)| p == "package.json" && c.contains("sqs-consumer")), + "message handler must stage package.json for hard broker imports" + ); + assert!(h.extra_files.iter().any(|(p, _)| p == "package-lock.json")); + let _ = std::fs::remove_dir_all(&dir); + } + #[test] fn entry_require_path_strips_extension() { assert_eq!(entry_require_path("entry.js"), "entry"); diff --git a/src/dynamic/lang/python.rs b/src/dynamic/lang/python.rs index b6e30c34..e5fff306 100644 --- a/src/dynamic/lang/python.rs +++ b/src/dynamic/lang/python.rs @@ -1009,7 +1009,7 @@ except Exception as _e: source: format!("{preamble}\n{body}\n{postamble}"), filename: "harness.py".to_owned(), command: vec!["python3".to_owned(), "harness.py".to_owned()], - extra_files: vec![], + extra_files: message_handler_dependency_files(spec), entry_subpath: None, } } @@ -3072,6 +3072,55 @@ fn read_entry_source(entry_file: &str) -> String { String::new() } +fn message_handler_dependency_files(spec: &HarnessSpec) -> Vec<(String, String)> { + if spec.expected_cap != crate::labels::Cap::CODE_EXEC { + return Vec::new(); + } + let source = read_entry_source(&spec.entry_file); + let deps = python_message_handler_deps(&source); + if deps.is_empty() { + return Vec::new(); + } + let mut body = String::new(); + for dep in deps { + body.push_str(dep); + body.push('\n'); + } + vec![("requirements.txt".to_owned(), body)] +} + +fn python_message_handler_deps(source: &str) -> Vec<&'static str> { + let mut deps = Vec::new(); + for raw_line in source.lines() { + let line = raw_line.trim_start(); + if line.starts_with('#') { + continue; + } + if (line.starts_with("from kafka import") || line.starts_with("import kafka")) + && !deps.contains(&"kafka-python") + { + deps.push("kafka-python"); + } + if (line.starts_with("import boto3") || line.starts_with("from boto3 import")) + && !deps.contains(&"boto3") + { + deps.push("boto3"); + } + if (line.starts_with("from google.cloud import pubsub") + || line.starts_with("import google.cloud.pubsub")) + && !deps.contains(&"google-cloud-pubsub") + { + deps.push("google-cloud-pubsub"); + } + if (line.starts_with("import pika") || line.starts_with("from pika import")) + && !deps.contains(&"pika") + { + deps.push("pika"); + } + } + deps +} + fn extra_files_for_shape(shape: PythonShape) -> Vec<(String, String)> { match shape { PythonShape::FlaskRoute => vec![("requirements.txt".to_owned(), "Flask\n".to_owned())], @@ -3998,6 +4047,61 @@ mod tests { && c.contains("httpx"))); } + #[test] + fn message_handler_deps_ignore_string_markers() { + let src = r#" +_NYX_ADAPTER_MARKER = "from kafka import KafkaConsumer" +_OTHER = "boto3.client('sqs')" +"#; + assert!(python_message_handler_deps(src).is_empty()); + } + + #[test] + fn message_handler_deps_detect_real_python_broker_imports() { + let src = r#" +from kafka import KafkaConsumer +import boto3 +from google.cloud import pubsub_v1 +import pika +"#; + assert_eq!( + python_message_handler_deps(src), + vec!["kafka-python", "boto3", "google-cloud-pubsub", "pika"] + ); + } + + #[test] + fn emit_message_handler_stages_requirements_for_hard_imports() { + let dir = std::env::temp_dir().join("nyx_message_handler_python_deps"); + let _ = std::fs::remove_dir_all(&dir); + std::fs::create_dir_all(&dir).unwrap(); + let entry = dir.join("entry.py"); + std::fs::write( + &entry, + "from kafka import KafkaConsumer\n\ + def handler(message):\n\ + return str(message)\n", + ) + .unwrap(); + + let mut spec = make_spec(PayloadSlot::Param(0)); + spec.entry_file = entry.to_string_lossy().into_owned(); + spec.entry_name = "handler".to_owned(); + spec.entry_kind = EntryKind::MessageHandler { + queue: "orders".to_owned(), + message_schema: None, + }; + spec.expected_cap = Cap::CODE_EXEC; + + let h = emit(&spec).unwrap(); + assert!( + h.extra_files + .iter() + .any(|(p, c)| { p == "requirements.txt" && c.contains("kafka-python") }) + ); + let _ = std::fs::remove_dir_all(&dir); + } + fn make_spec_with(kind: EntryKind, name: &str) -> HarnessSpec { let mut s = make_spec(PayloadSlot::Param(0)); s.entry_kind = kind;