From bd0135e4231ed48e4bc18f981e295ef84fb8dfb2 Mon Sep 17 00:00:00 2001 From: pitboss Date: Wed, 20 May 2026 16:03:40 -0500 Subject: [PATCH] =?UTF-8?q?[pitboss]=20phase=2020:=20Track=20M.2=20?= =?UTF-8?q?=E2=80=94=20`MessageHandler`=20end-to-end=20(Kafka=20/=20SQS=20?= =?UTF-8?q?/=20Pub-Sub=20/=20NATS=20/=20RabbitMQ)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/dynamic/framework/adapters/kafka_java.rs | 115 ++++ .../framework/adapters/kafka_python.rs | 136 +++++ src/dynamic/framework/adapters/mod.rs | 20 + src/dynamic/framework/adapters/nats_go.rs | 108 ++++ src/dynamic/framework/adapters/pubsub_go.rs | 108 ++++ .../framework/adapters/pubsub_python.rs | 115 ++++ src/dynamic/framework/adapters/rabbit_java.rs | 116 ++++ .../framework/adapters/rabbit_python.rs | 111 ++++ src/dynamic/framework/adapters/sqs_java.rs | 110 ++++ src/dynamic/framework/adapters/sqs_node.rs | 112 ++++ src/dynamic/framework/adapters/sqs_python.rs | 112 ++++ src/dynamic/framework/mod.rs | 31 +- src/dynamic/framework/registry.rs | 10 + src/dynamic/lang/go.rs | 158 +++++ src/dynamic/lang/java.rs | 186 ++++++ src/dynamic/lang/js_shared.rs | 87 +++ src/dynamic/lang/mod.rs | 55 +- src/dynamic/lang/python.rs | 167 ++++++ src/dynamic/stubs/broker_kafka.rs | 109 ++++ src/dynamic/stubs/broker_nats.rs | 81 +++ src/dynamic/stubs/broker_pubsub.rs | 100 ++++ src/dynamic/stubs/broker_rabbit.rs | 88 +++ src/dynamic/stubs/broker_sqs.rs | 119 ++++ src/dynamic/stubs/mod.rs | 10 + .../message_handler/kafka_java/Benign.java | 9 + .../message_handler/kafka_java/Vuln.java | 15 + .../message_handler/kafka_python/benign.py | 9 + .../message_handler/kafka_python/vuln.py | 25 + .../message_handler/nats_go/benign.go | 19 + .../message_handler/nats_go/vuln.go | 22 + .../message_handler/pubsub_go/benign.go | 19 + .../message_handler/pubsub_go/vuln.go | 24 + .../message_handler/pubsub_python/benign.py | 21 + .../message_handler/pubsub_python/vuln.py | 28 + .../message_handler/rabbit_java/Benign.java | 10 + .../message_handler/rabbit_java/Vuln.java | 12 + .../message_handler/rabbit_python/benign.py | 12 + .../message_handler/rabbit_python/vuln.py | 19 + .../message_handler/sqs_java/Benign.java | 11 + .../message_handler/sqs_java/Vuln.java | 13 + .../message_handler/sqs_node/benign.js | 16 + .../message_handler/sqs_node/vuln.js | 22 + .../message_handler/sqs_python/benign.py | 10 + .../message_handler/sqs_python/vuln.py | 17 + tests/message_handler_corpus.rs | 555 ++++++++++++++++++ 45 files changed, 3227 insertions(+), 25 deletions(-) create mode 100644 src/dynamic/framework/adapters/kafka_java.rs create mode 100644 src/dynamic/framework/adapters/kafka_python.rs create mode 100644 src/dynamic/framework/adapters/nats_go.rs create mode 100644 src/dynamic/framework/adapters/pubsub_go.rs create mode 100644 src/dynamic/framework/adapters/pubsub_python.rs create mode 100644 src/dynamic/framework/adapters/rabbit_java.rs create mode 100644 src/dynamic/framework/adapters/rabbit_python.rs create mode 100644 src/dynamic/framework/adapters/sqs_java.rs create mode 100644 src/dynamic/framework/adapters/sqs_node.rs create mode 100644 src/dynamic/framework/adapters/sqs_python.rs create mode 100644 src/dynamic/stubs/broker_kafka.rs create mode 100644 src/dynamic/stubs/broker_nats.rs create mode 100644 src/dynamic/stubs/broker_pubsub.rs create mode 100644 src/dynamic/stubs/broker_rabbit.rs create mode 100644 src/dynamic/stubs/broker_sqs.rs create mode 100644 tests/dynamic_fixtures/message_handler/kafka_java/Benign.java create mode 100644 tests/dynamic_fixtures/message_handler/kafka_java/Vuln.java create mode 100644 tests/dynamic_fixtures/message_handler/kafka_python/benign.py create mode 100644 tests/dynamic_fixtures/message_handler/kafka_python/vuln.py create mode 100644 tests/dynamic_fixtures/message_handler/nats_go/benign.go create mode 100644 tests/dynamic_fixtures/message_handler/nats_go/vuln.go create mode 100644 tests/dynamic_fixtures/message_handler/pubsub_go/benign.go create mode 100644 tests/dynamic_fixtures/message_handler/pubsub_go/vuln.go create mode 100644 tests/dynamic_fixtures/message_handler/pubsub_python/benign.py create mode 100644 tests/dynamic_fixtures/message_handler/pubsub_python/vuln.py create mode 100644 tests/dynamic_fixtures/message_handler/rabbit_java/Benign.java create mode 100644 tests/dynamic_fixtures/message_handler/rabbit_java/Vuln.java create mode 100644 tests/dynamic_fixtures/message_handler/rabbit_python/benign.py create mode 100644 tests/dynamic_fixtures/message_handler/rabbit_python/vuln.py create mode 100644 tests/dynamic_fixtures/message_handler/sqs_java/Benign.java create mode 100644 tests/dynamic_fixtures/message_handler/sqs_java/Vuln.java create mode 100644 tests/dynamic_fixtures/message_handler/sqs_node/benign.js create mode 100644 tests/dynamic_fixtures/message_handler/sqs_node/vuln.js create mode 100644 tests/dynamic_fixtures/message_handler/sqs_python/benign.py create mode 100644 tests/dynamic_fixtures/message_handler/sqs_python/vuln.py create mode 100644 tests/message_handler_corpus.rs diff --git a/src/dynamic/framework/adapters/kafka_java.rs b/src/dynamic/framework/adapters/kafka_java.rs new file mode 100644 index 00000000..849e396b --- /dev/null +++ b/src/dynamic/framework/adapters/kafka_java.rs @@ -0,0 +1,115 @@ +//! Phase 20 (Track M.2) — Java Kafka consumer adapter. +//! +//! Fires on Spring Kafka `@KafkaListener` annotations or +//! `org.apache.kafka.clients.consumer.KafkaConsumer` references. Best- +//! effort topic extraction reads the literal that follows `topics = +//! "..."` / `topics = {"..."}` / `subscribe(Arrays.asList("..."))`. + +use crate::dynamic::framework::{FrameworkAdapter, FrameworkBinding}; +use crate::evidence::EntryKind; +use crate::summary::FuncSummary; +use crate::symbol::Lang; + +pub struct KafkaJavaAdapter; + +const ADAPTER_NAME: &str = "kafka-java"; + +fn callee_is_kafka(name: &str) -> bool { + let last = name.rsplit_once('.').map(|(_, s)| s).unwrap_or(name); + matches!( + last, + "KafkaConsumer" | "subscribe" | "poll" | "onMessage" | "consume" + ) +} + +fn source_imports_kafka(file_bytes: &[u8]) -> bool { + const NEEDLES: &[&[u8]] = &[ + b"org.apache.kafka", + b"org.springframework.kafka", + b"@KafkaListener", + ]; + NEEDLES + .iter() + .any(|n| file_bytes.windows(n.len()).any(|w| w == *n)) +} + +fn extract_topic(file_bytes: &[u8]) -> String { + let text = std::str::from_utf8(file_bytes).unwrap_or(""); + for needle in ["topics = \"", "topics=\"", "topics = {\"", "subscribe(Arrays.asList(\""] { + if let Some(idx) = text.find(needle) { + let after = &text[idx + needle.len()..]; + if let Some(end) = after.find('"') { + return after[..end].to_owned(); + } + } + } + String::new() +} + +impl FrameworkAdapter for KafkaJavaAdapter { + fn name(&self) -> &'static str { + ADAPTER_NAME + } + + fn lang(&self) -> Lang { + Lang::Java + } + + fn detect( + &self, + summary: &FuncSummary, + _ast: tree_sitter::Node<'_>, + file_bytes: &[u8], + ) -> Option { + let matches_call = super::any_callee_matches(summary, callee_is_kafka); + let matches_source = source_imports_kafka(file_bytes); + if matches_call || matches_source { + Some(FrameworkBinding { + adapter: ADAPTER_NAME.to_owned(), + kind: EntryKind::MessageHandler { + queue: extract_topic(file_bytes), + message_schema: None, + }, + route: None, + request_params: Vec::new(), + response_writer: None, + middleware: Vec::new(), + }) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse_java(src: &[u8]) -> tree_sitter::Tree { + let mut parser = tree_sitter::Parser::new(); + let lang = tree_sitter::Language::from(tree_sitter_java::LANGUAGE); + parser.set_language(&lang).unwrap(); + parser.parse(src, None).unwrap() + } + + #[test] + fn fires_on_spring_kafka_listener() { + let src: &[u8] = b"import org.springframework.kafka.annotation.KafkaListener;\n\ + public class Vuln {\n\ + @KafkaListener(topics = \"orders\")\n\ + public void onMessage(String body) {}\n\ + }\n"; + let tree = parse_java(src); + let summary = FuncSummary { + name: "onMessage".into(), + ..Default::default() + }; + let binding = KafkaJavaAdapter + .detect(&summary, tree.root_node(), src) + .expect("@KafkaListener binds"); + assert!(matches!(binding.kind, EntryKind::MessageHandler { .. })); + if let EntryKind::MessageHandler { queue, .. } = binding.kind { + assert_eq!(queue, "orders"); + } + } +} diff --git a/src/dynamic/framework/adapters/kafka_python.rs b/src/dynamic/framework/adapters/kafka_python.rs new file mode 100644 index 00000000..c1c98b15 --- /dev/null +++ b/src/dynamic/framework/adapters/kafka_python.rs @@ -0,0 +1,136 @@ +//! Phase 20 (Track M.2) — Python Kafka consumer adapter. +//! +//! Fires when the surrounding source imports the canonical Python +//! Kafka clients (`kafka-python` or `confluent-kafka`) and the function +//! body invokes a consumer-shaped callee. The binding's +//! [`EntryKind::MessageHandler`] is stamped with a best-effort `queue` +//! extracted from the source (a `KafkaConsumer('topic', ...)` / +//! `Consumer({"group.id": ..., "topics": ["t"]}).subscribe([...])` +//! literal); a missing topic falls back to the empty string. + +use crate::dynamic::framework::{FrameworkAdapter, FrameworkBinding}; +use crate::evidence::EntryKind; +use crate::summary::FuncSummary; +use crate::symbol::Lang; + +pub struct KafkaPythonAdapter; + +const ADAPTER_NAME: &str = "kafka-python"; + +fn callee_is_kafka_consumer(name: &str) -> bool { + let last = name.rsplit_once('.').map(|(_, s)| s).unwrap_or(name); + matches!( + last, + "KafkaConsumer" | "Consumer" | "subscribe" | "poll" | "consume" | "process_message" + ) +} + +fn source_imports_kafka(file_bytes: &[u8]) -> bool { + const NEEDLES: &[&[u8]] = &[ + b"from kafka", + b"import kafka", + b"from confluent_kafka", + b"import confluent_kafka", + ]; + NEEDLES + .iter() + .any(|n| file_bytes.windows(n.len()).any(|w| w == *n)) +} + +fn extract_topic_literal(file_bytes: &[u8]) -> String { + let text = std::str::from_utf8(file_bytes).unwrap_or(""); + for needle in ["KafkaConsumer(", ".subscribe(", "topic="] { + if let Some(idx) = text.find(needle) { + let after = &text[idx + needle.len()..]; + for (open, close) in [('"', '"'), ('\'', '\'')] { + if let Some(o) = after.find(open) { + let rest = &after[o + 1..]; + if let Some(c) = rest.find(close) { + return rest[..c].to_owned(); + } + } + } + } + } + String::new() +} + +impl FrameworkAdapter for KafkaPythonAdapter { + fn name(&self) -> &'static str { + ADAPTER_NAME + } + + fn lang(&self) -> Lang { + Lang::Python + } + + fn detect( + &self, + summary: &FuncSummary, + _ast: tree_sitter::Node<'_>, + file_bytes: &[u8], + ) -> Option { + let matches_call = super::any_callee_matches(summary, callee_is_kafka_consumer); + let matches_source = source_imports_kafka(file_bytes); + if matches_call || matches_source { + Some(FrameworkBinding { + adapter: ADAPTER_NAME.to_owned(), + kind: EntryKind::MessageHandler { + queue: extract_topic_literal(file_bytes), + message_schema: None, + }, + route: None, + request_params: Vec::new(), + response_writer: None, + middleware: Vec::new(), + }) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse_python(src: &[u8]) -> tree_sitter::Tree { + let mut parser = tree_sitter::Parser::new(); + let lang = tree_sitter::Language::from(tree_sitter_python::LANGUAGE); + parser.set_language(&lang).unwrap(); + parser.parse(src, None).unwrap() + } + + #[test] + fn fires_on_kafka_python_consumer() { + let src: &[u8] = b"from kafka import KafkaConsumer\n\n\ + def handler(msg):\n print(msg)\n\n\ + consumer = KafkaConsumer('orders', bootstrap_servers='broker:9092')\n"; + let tree = parse_python(src); + let summary = FuncSummary { + name: "handler".into(), + ..Default::default() + }; + let binding = KafkaPythonAdapter + .detect(&summary, tree.root_node(), src) + .expect("kafka import binds"); + assert_eq!(binding.adapter, "kafka-python"); + assert!(matches!(binding.kind, EntryKind::MessageHandler { .. })); + if let EntryKind::MessageHandler { queue, .. } = binding.kind { + assert_eq!(queue, "orders"); + } + } + + #[test] + fn skips_plain_function() { + let src: &[u8] = b"def add(a, b):\n return a + b\n"; + let tree = parse_python(src); + let summary = FuncSummary { + name: "add".into(), + ..Default::default() + }; + assert!(KafkaPythonAdapter + .detect(&summary, tree.root_node(), src) + .is_none()); + } +} diff --git a/src/dynamic/framework/adapters/mod.rs b/src/dynamic/framework/adapters/mod.rs index 8c1e6e01..fa6b5373 100644 --- a/src/dynamic/framework/adapters/mod.rs +++ b/src/dynamic/framework/adapters/mod.rs @@ -36,9 +36,12 @@ pub mod js_handlebars; pub mod js_koa; pub mod js_nest; pub mod js_routes; +pub mod kafka_java; +pub mod kafka_python; pub mod ldap_php; pub mod ldap_python; pub mod ldap_spring; +pub mod nats_go; pub mod php_codeigniter; pub mod php_laravel; pub mod php_routes; @@ -48,6 +51,8 @@ pub mod php_unserialize; pub mod pp_json_deep_assign; pub mod pp_lodash_merge; pub mod pp_object_assign; +pub mod pubsub_go; +pub mod pubsub_python; pub mod python_django; pub mod python_fastapi; pub mod python_flask; @@ -55,6 +60,8 @@ pub mod python_jinja2; pub mod python_pickle; pub mod python_routes; pub mod python_starlette; +pub mod rabbit_java; +pub mod rabbit_python; pub mod redirect_go; pub mod redirect_java; pub mod redirect_js; @@ -73,6 +80,9 @@ pub mod rust_axum; pub mod rust_rocket; pub mod rust_routes; pub mod rust_warp; +pub mod sqs_java; +pub mod sqs_node; +pub mod sqs_python; pub mod xpath_java; pub mod xpath_js; pub mod xpath_php; @@ -105,9 +115,12 @@ pub use js_fastify::JsFastifyAdapter; pub use js_handlebars::JsHandlebarsAdapter; pub use js_koa::JsKoaAdapter; pub use js_nest::{JsNestAdapter, TsNestAdapter}; +pub use kafka_java::KafkaJavaAdapter; +pub use kafka_python::KafkaPythonAdapter; pub use ldap_php::LdapPhpAdapter; pub use ldap_python::LdapPythonAdapter; pub use ldap_spring::LdapSpringAdapter; +pub use nats_go::NatsGoAdapter; pub use php_codeigniter::PhpCodeIgniterAdapter; pub use php_laravel::PhpLaravelAdapter; pub use php_symfony::PhpSymfonyAdapter; @@ -116,12 +129,16 @@ pub use php_unserialize::PhpUnserializeAdapter; pub use pp_json_deep_assign::{PpJsonDeepAssignJsAdapter, PpJsonDeepAssignTsAdapter}; pub use pp_lodash_merge::{PpLodashMergeJsAdapter, PpLodashMergeTsAdapter}; pub use pp_object_assign::{PpObjectAssignJsAdapter, PpObjectAssignTsAdapter}; +pub use pubsub_go::PubsubGoAdapter; +pub use pubsub_python::PubsubPythonAdapter; pub use python_django::PythonDjangoAdapter; pub use python_fastapi::PythonFastApiAdapter; pub use python_flask::PythonFlaskAdapter; pub use python_jinja2::PythonJinja2Adapter; pub use python_pickle::PythonPickleAdapter; pub use python_starlette::PythonStarletteAdapter; +pub use rabbit_java::RabbitJavaAdapter; +pub use rabbit_python::RabbitPythonAdapter; pub use redirect_go::RedirectGoAdapter; pub use redirect_java::RedirectJavaAdapter; pub use redirect_js::RedirectJsAdapter; @@ -138,6 +155,9 @@ pub use rust_actix::RustActixAdapter; pub use rust_axum::RustAxumAdapter; pub use rust_rocket::RustRocketAdapter; pub use rust_warp::RustWarpAdapter; +pub use sqs_java::SqsJavaAdapter; +pub use sqs_node::SqsNodeAdapter; +pub use sqs_python::SqsPythonAdapter; pub use xpath_java::XpathJavaAdapter; pub use xpath_js::XpathJsAdapter; pub use xpath_php::XpathPhpAdapter; diff --git a/src/dynamic/framework/adapters/nats_go.rs b/src/dynamic/framework/adapters/nats_go.rs new file mode 100644 index 00000000..77b0bae7 --- /dev/null +++ b/src/dynamic/framework/adapters/nats_go.rs @@ -0,0 +1,108 @@ +//! Phase 20 (Track M.2) — Go NATS subscriber adapter (`nats.go`). + +use crate::dynamic::framework::{FrameworkAdapter, FrameworkBinding}; +use crate::evidence::EntryKind; +use crate::summary::FuncSummary; +use crate::symbol::Lang; + +pub struct NatsGoAdapter; + +const ADAPTER_NAME: &str = "nats-go"; + +fn callee_is_nats(name: &str) -> bool { + let last = name.rsplit_once('.').map(|(_, s)| s).unwrap_or(name); + matches!( + last, + "Subscribe" | "QueueSubscribe" | "Publish" | "HandleMessage" | "OnMessage" + ) +} + +fn source_imports_nats(file_bytes: &[u8]) -> bool { + const NEEDLES: &[&[u8]] = &[ + b"github.com/nats-io/nats.go", + b"nats.Connect", + b"nats.Msg", + ]; + NEEDLES + .iter() + .any(|n| file_bytes.windows(n.len()).any(|w| w == *n)) +} + +fn extract_subject(file_bytes: &[u8]) -> String { + let text = std::str::from_utf8(file_bytes).unwrap_or(""); + for needle in [".Subscribe(\"", ".QueueSubscribe(\""] { + if let Some(idx) = text.find(needle) { + let after = &text[idx + needle.len()..]; + if let Some(end) = after.find('"') { + return after[..end].to_owned(); + } + } + } + String::new() +} + +impl FrameworkAdapter for NatsGoAdapter { + fn name(&self) -> &'static str { + ADAPTER_NAME + } + + fn lang(&self) -> Lang { + Lang::Go + } + + fn detect( + &self, + summary: &FuncSummary, + _ast: tree_sitter::Node<'_>, + file_bytes: &[u8], + ) -> Option { + let matches_call = super::any_callee_matches(summary, callee_is_nats); + let matches_source = source_imports_nats(file_bytes); + if matches_call || matches_source { + Some(FrameworkBinding { + adapter: ADAPTER_NAME.to_owned(), + kind: EntryKind::MessageHandler { + queue: extract_subject(file_bytes), + message_schema: None, + }, + route: None, + request_params: Vec::new(), + response_writer: None, + middleware: Vec::new(), + }) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse_go(src: &[u8]) -> tree_sitter::Tree { + let mut parser = tree_sitter::Parser::new(); + let lang = tree_sitter::Language::from(tree_sitter_go::LANGUAGE); + parser.set_language(&lang).unwrap(); + parser.parse(src, None).unwrap() + } + + #[test] + fn fires_on_nats_subscribe() { + let src: &[u8] = b"package entry\nimport \"github.com/nats-io/nats.go\"\n\ + func OnMessage(msg *nats.Msg) {}\n\ + var nc = nats.Connect()\n\ + var sub, _ = nc.Subscribe(\"events\", OnMessage)\n"; + let tree = parse_go(src); + let summary = FuncSummary { + name: "OnMessage".into(), + ..Default::default() + }; + let binding = NatsGoAdapter + .detect(&summary, tree.root_node(), src) + .expect("nats.Subscribe binds"); + if let EntryKind::MessageHandler { queue, .. } = binding.kind { + assert_eq!(queue, "events"); + } + } +} diff --git a/src/dynamic/framework/adapters/pubsub_go.rs b/src/dynamic/framework/adapters/pubsub_go.rs new file mode 100644 index 00000000..dfbbd7bb --- /dev/null +++ b/src/dynamic/framework/adapters/pubsub_go.rs @@ -0,0 +1,108 @@ +//! Phase 20 (Track M.2) — Go Google Pub/Sub subscriber adapter +//! (`cloud.google.com/go/pubsub`). + +use crate::dynamic::framework::{FrameworkAdapter, FrameworkBinding}; +use crate::evidence::EntryKind; +use crate::summary::FuncSummary; +use crate::symbol::Lang; + +pub struct PubsubGoAdapter; + +const ADAPTER_NAME: &str = "pubsub-go"; + +fn callee_is_pubsub(name: &str) -> bool { + let last = name.rsplit_once('.').map(|(_, s)| s).unwrap_or(name); + matches!( + last, + "Receive" | "Subscription" | "Pull" | "Handle" | "OnMessage" + ) +} + +fn source_imports_pubsub(file_bytes: &[u8]) -> bool { + const NEEDLES: &[&[u8]] = &[ + b"cloud.google.com/go/pubsub", + b"pubsub.NewClient", + b"pubsub.Message", + ]; + NEEDLES + .iter() + .any(|n| file_bytes.windows(n.len()).any(|w| w == *n)) +} + +fn extract_topic(file_bytes: &[u8]) -> String { + let text = std::str::from_utf8(file_bytes).unwrap_or(""); + for needle in [".Subscription(\"", "SubscriptionID(\"", "TopicID(\""] { + if let Some(idx) = text.find(needle) { + let after = &text[idx + needle.len()..]; + if let Some(end) = after.find('"') { + return after[..end].to_owned(); + } + } + } + String::new() +} + +impl FrameworkAdapter for PubsubGoAdapter { + fn name(&self) -> &'static str { + ADAPTER_NAME + } + + fn lang(&self) -> Lang { + Lang::Go + } + + fn detect( + &self, + summary: &FuncSummary, + _ast: tree_sitter::Node<'_>, + file_bytes: &[u8], + ) -> Option { + let matches_call = super::any_callee_matches(summary, callee_is_pubsub); + let matches_source = source_imports_pubsub(file_bytes); + if matches_call || matches_source { + Some(FrameworkBinding { + adapter: ADAPTER_NAME.to_owned(), + kind: EntryKind::MessageHandler { + queue: extract_topic(file_bytes), + message_schema: None, + }, + route: None, + request_params: Vec::new(), + response_writer: None, + middleware: Vec::new(), + }) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse_go(src: &[u8]) -> tree_sitter::Tree { + let mut parser = tree_sitter::Parser::new(); + let lang = tree_sitter::Language::from(tree_sitter_go::LANGUAGE); + parser.set_language(&lang).unwrap(); + parser.parse(src, None).unwrap() + } + + #[test] + fn fires_on_pubsub_subscription() { + let src: &[u8] = b"package entry\nimport \"cloud.google.com/go/pubsub\"\n\ + func Handle(msg *pubsub.Message) {}\n\ + var sub = pubsub.NewClient.Subscription(\"my-sub\")\n"; + let tree = parse_go(src); + let summary = FuncSummary { + name: "Handle".into(), + ..Default::default() + }; + let binding = PubsubGoAdapter + .detect(&summary, tree.root_node(), src) + .expect("pubsub.Subscription binds"); + if let EntryKind::MessageHandler { queue, .. } = binding.kind { + assert_eq!(queue, "my-sub"); + } + } +} diff --git a/src/dynamic/framework/adapters/pubsub_python.rs b/src/dynamic/framework/adapters/pubsub_python.rs new file mode 100644 index 00000000..5456f5c2 --- /dev/null +++ b/src/dynamic/framework/adapters/pubsub_python.rs @@ -0,0 +1,115 @@ +//! Phase 20 (Track M.2) — Python Google Pub/Sub subscriber adapter. + +use crate::dynamic::framework::{FrameworkAdapter, FrameworkBinding}; +use crate::evidence::EntryKind; +use crate::summary::FuncSummary; +use crate::symbol::Lang; + +pub struct PubsubPythonAdapter; + +const ADAPTER_NAME: &str = "pubsub-python"; + +fn callee_is_pubsub(name: &str) -> bool { + let last = name.rsplit_once('.').map(|(_, s)| s).unwrap_or(name); + matches!( + last, + "subscribe" | "pull" | "callback" | "process_message" + ) +} + +fn source_imports_pubsub(file_bytes: &[u8]) -> bool { + const NEEDLES: &[&[u8]] = &[ + b"google.cloud.pubsub", + b"from google.cloud import pubsub", + b"google.cloud.pubsub_v1", + ]; + NEEDLES + .iter() + .any(|n| file_bytes.windows(n.len()).any(|w| w == *n)) +} + +fn extract_topic(file_bytes: &[u8]) -> String { + let text = std::str::from_utf8(file_bytes).unwrap_or(""); + // Needles include the opening quote so we only need to find the + // closing one — avoids picking up the next literal after a comma. + for (needle, close) in [ + (".subscribe(\"", '"'), + (".subscribe('", '\''), + ("subscription_path(\"", '"'), + ("subscription_path('", '\''), + ] { + if let Some(idx) = text.find(needle) { + let after = &text[idx + needle.len()..]; + if let Some(end) = after.find(close) { + return after[..end].to_owned(); + } + } + } + String::new() +} + +impl FrameworkAdapter for PubsubPythonAdapter { + fn name(&self) -> &'static str { + ADAPTER_NAME + } + + fn lang(&self) -> Lang { + Lang::Python + } + + fn detect( + &self, + summary: &FuncSummary, + _ast: tree_sitter::Node<'_>, + file_bytes: &[u8], + ) -> Option { + let matches_call = super::any_callee_matches(summary, callee_is_pubsub); + let matches_source = source_imports_pubsub(file_bytes); + if matches_call || matches_source { + Some(FrameworkBinding { + adapter: ADAPTER_NAME.to_owned(), + kind: EntryKind::MessageHandler { + queue: extract_topic(file_bytes), + message_schema: None, + }, + route: None, + request_params: Vec::new(), + response_writer: None, + middleware: Vec::new(), + }) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse_python(src: &[u8]) -> tree_sitter::Tree { + let mut parser = tree_sitter::Parser::new(); + let lang = tree_sitter::Language::from(tree_sitter_python::LANGUAGE); + parser.set_language(&lang).unwrap(); + parser.parse(src, None).unwrap() + } + + #[test] + fn fires_on_pubsub_v1_subscribe() { + let src: &[u8] = b"from google.cloud import pubsub_v1\n\ + def callback(message):\n pass\n\ + sub = pubsub_v1.SubscriberClient()\n\ + sub.subscribe(\"projects/p/subscriptions/s\", callback=callback)\n"; + let tree = parse_python(src); + let summary = FuncSummary { + name: "callback".into(), + ..Default::default() + }; + let binding = PubsubPythonAdapter + .detect(&summary, tree.root_node(), src) + .expect("pubsub_v1 binds"); + if let EntryKind::MessageHandler { queue, .. } = binding.kind { + assert_eq!(queue, "projects/p/subscriptions/s"); + } + } +} diff --git a/src/dynamic/framework/adapters/rabbit_java.rs b/src/dynamic/framework/adapters/rabbit_java.rs new file mode 100644 index 00000000..0991f077 --- /dev/null +++ b/src/dynamic/framework/adapters/rabbit_java.rs @@ -0,0 +1,116 @@ +//! Phase 20 (Track M.2) — Java RabbitMQ consumer adapter +//! (`com.rabbitmq.client.Channel.basicConsume`, Spring AMQP +//! `@RabbitListener`). + +use crate::dynamic::framework::{FrameworkAdapter, FrameworkBinding}; +use crate::evidence::EntryKind; +use crate::summary::FuncSummary; +use crate::symbol::Lang; + +pub struct RabbitJavaAdapter; + +const ADAPTER_NAME: &str = "rabbit-java"; + +fn callee_is_rabbit(name: &str) -> bool { + let last = name.rsplit_once('.').map(|(_, s)| s).unwrap_or(name); + matches!( + last, + "basicConsume" | "basicGet" | "handleDelivery" | "onMessage" | "receive" + ) +} + +fn source_imports_rabbit(file_bytes: &[u8]) -> bool { + const NEEDLES: &[&[u8]] = &[ + b"com.rabbitmq.client", + b"org.springframework.amqp.rabbit", + b"@RabbitListener", + ]; + NEEDLES + .iter() + .any(|n| file_bytes.windows(n.len()).any(|w| w == *n)) +} + +fn extract_queue(file_bytes: &[u8]) -> String { + let text = std::str::from_utf8(file_bytes).unwrap_or(""); + for needle in [ + "@RabbitListener(queues = \"", + "@RabbitListener(queues=\"", + "basicConsume(\"", + "queueDeclare(\"", + ] { + if let Some(idx) = text.find(needle) { + let after = &text[idx + needle.len()..]; + if let Some(end) = after.find('"') { + return after[..end].to_owned(); + } + } + } + String::new() +} + +impl FrameworkAdapter for RabbitJavaAdapter { + fn name(&self) -> &'static str { + ADAPTER_NAME + } + + fn lang(&self) -> Lang { + Lang::Java + } + + fn detect( + &self, + summary: &FuncSummary, + _ast: tree_sitter::Node<'_>, + file_bytes: &[u8], + ) -> Option { + let matches_call = super::any_callee_matches(summary, callee_is_rabbit); + let matches_source = source_imports_rabbit(file_bytes); + if matches_call || matches_source { + Some(FrameworkBinding { + adapter: ADAPTER_NAME.to_owned(), + kind: EntryKind::MessageHandler { + queue: extract_queue(file_bytes), + message_schema: None, + }, + route: None, + request_params: Vec::new(), + response_writer: None, + middleware: Vec::new(), + }) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse_java(src: &[u8]) -> tree_sitter::Tree { + let mut parser = tree_sitter::Parser::new(); + let lang = tree_sitter::Language::from(tree_sitter_java::LANGUAGE); + parser.set_language(&lang).unwrap(); + parser.parse(src, None).unwrap() + } + + #[test] + fn fires_on_rabbit_listener_annotation() { + let src: &[u8] = b"import org.springframework.amqp.rabbit.annotation.RabbitListener;\n\ + public class Vuln {\n\ + @RabbitListener(queues = \"work\")\n\ + public void onMessage(String mid, String body) {}\n\ + }\n"; + let tree = parse_java(src); + let summary = FuncSummary { + name: "onMessage".into(), + ..Default::default() + }; + let binding = RabbitJavaAdapter + .detect(&summary, tree.root_node(), src) + .expect("@RabbitListener binds"); + if let EntryKind::MessageHandler { queue, .. } = binding.kind { + assert_eq!(queue, "work"); + } + } +} diff --git a/src/dynamic/framework/adapters/rabbit_python.rs b/src/dynamic/framework/adapters/rabbit_python.rs new file mode 100644 index 00000000..74e2778f --- /dev/null +++ b/src/dynamic/framework/adapters/rabbit_python.rs @@ -0,0 +1,111 @@ +//! Phase 20 (Track M.2) — Python RabbitMQ consumer adapter +//! (`pika.BlockingConnection`, `aio-pika`). + +use crate::dynamic::framework::{FrameworkAdapter, FrameworkBinding}; +use crate::evidence::EntryKind; +use crate::summary::FuncSummary; +use crate::symbol::Lang; + +pub struct RabbitPythonAdapter; + +const ADAPTER_NAME: &str = "rabbit-python"; + +fn callee_is_rabbit(name: &str) -> bool { + let last = name.rsplit_once('.').map(|(_, s)| s).unwrap_or(name); + matches!( + last, + "basic_consume" | "basic_get" | "handle" | "on_message" | "process" + ) +} + +fn source_imports_rabbit(file_bytes: &[u8]) -> bool { + const NEEDLES: &[&[u8]] = &[ + b"import pika", + b"from pika", + b"import aio_pika", + b"from aio_pika", + ]; + NEEDLES + .iter() + .any(|n| file_bytes.windows(n.len()).any(|w| w == *n)) +} + +fn extract_queue(file_bytes: &[u8]) -> String { + let text = std::str::from_utf8(file_bytes).unwrap_or(""); + for needle in ["queue=\"", "queue='", "queue_declare(\"", "queue_declare('"] { + if let Some(idx) = text.find(needle) { + let after = &text[idx + needle.len()..]; + let close = if needle.ends_with('"') { '"' } else { '\'' }; + if let Some(end) = after.find(close) { + return after[..end].to_owned(); + } + } + } + String::new() +} + +impl FrameworkAdapter for RabbitPythonAdapter { + fn name(&self) -> &'static str { + ADAPTER_NAME + } + + fn lang(&self) -> Lang { + Lang::Python + } + + fn detect( + &self, + summary: &FuncSummary, + _ast: tree_sitter::Node<'_>, + file_bytes: &[u8], + ) -> Option { + let matches_call = super::any_callee_matches(summary, callee_is_rabbit); + let matches_source = source_imports_rabbit(file_bytes); + if matches_call || matches_source { + Some(FrameworkBinding { + adapter: ADAPTER_NAME.to_owned(), + kind: EntryKind::MessageHandler { + queue: extract_queue(file_bytes), + message_schema: None, + }, + route: None, + request_params: Vec::new(), + response_writer: None, + middleware: Vec::new(), + }) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse_python(src: &[u8]) -> tree_sitter::Tree { + let mut parser = tree_sitter::Parser::new(); + let lang = tree_sitter::Language::from(tree_sitter_python::LANGUAGE); + parser.set_language(&lang).unwrap(); + parser.parse(src, None).unwrap() + } + + #[test] + fn fires_on_pika_basic_consume() { + let src: &[u8] = b"import pika\n\ + def on_message(ch, method, properties, body):\n pass\n\ + chan = pika.BlockingConnection().channel()\n\ + chan.basic_consume(queue=\"work\", on_message_callback=on_message)\n"; + let tree = parse_python(src); + let summary = FuncSummary { + name: "on_message".into(), + ..Default::default() + }; + let binding = RabbitPythonAdapter + .detect(&summary, tree.root_node(), src) + .expect("pika binds"); + if let EntryKind::MessageHandler { queue, .. } = binding.kind { + assert_eq!(queue, "work"); + } + } +} diff --git a/src/dynamic/framework/adapters/sqs_java.rs b/src/dynamic/framework/adapters/sqs_java.rs new file mode 100644 index 00000000..78914147 --- /dev/null +++ b/src/dynamic/framework/adapters/sqs_java.rs @@ -0,0 +1,110 @@ +//! Phase 20 (Track M.2) — Java SQS consumer adapter. + +use crate::dynamic::framework::{FrameworkAdapter, FrameworkBinding}; +use crate::evidence::EntryKind; +use crate::summary::FuncSummary; +use crate::symbol::Lang; + +pub struct SqsJavaAdapter; + +const ADAPTER_NAME: &str = "sqs-java"; + +fn callee_is_sqs(name: &str) -> bool { + let last = name.rsplit_once('.').map(|(_, s)| s).unwrap_or(name); + matches!( + last, + "receiveMessage" | "deleteMessage" | "onMessage" | "handleMessage" + ) +} + +fn source_imports_sqs(file_bytes: &[u8]) -> bool { + const NEEDLES: &[&[u8]] = &[ + b"software.amazon.awssdk.services.sqs", + b"com.amazonaws.services.sqs", + b"@SqsListener", + b"io.awspring.cloud.sqs", + ]; + NEEDLES + .iter() + .any(|n| file_bytes.windows(n.len()).any(|w| w == *n)) +} + +fn extract_queue(file_bytes: &[u8]) -> String { + let text = std::str::from_utf8(file_bytes).unwrap_or(""); + for needle in ["@SqsListener(\"", "queueUrl(\"", "queueName(\""] { + if let Some(idx) = text.find(needle) { + let after = &text[idx + needle.len()..]; + if let Some(end) = after.find('"') { + return after[..end].to_owned(); + } + } + } + String::new() +} + +impl FrameworkAdapter for SqsJavaAdapter { + fn name(&self) -> &'static str { + ADAPTER_NAME + } + + fn lang(&self) -> Lang { + Lang::Java + } + + fn detect( + &self, + summary: &FuncSummary, + _ast: tree_sitter::Node<'_>, + file_bytes: &[u8], + ) -> Option { + let matches_call = super::any_callee_matches(summary, callee_is_sqs); + let matches_source = source_imports_sqs(file_bytes); + if matches_call || matches_source { + Some(FrameworkBinding { + adapter: ADAPTER_NAME.to_owned(), + kind: EntryKind::MessageHandler { + queue: extract_queue(file_bytes), + message_schema: None, + }, + route: None, + request_params: Vec::new(), + response_writer: None, + middleware: Vec::new(), + }) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse_java(src: &[u8]) -> tree_sitter::Tree { + let mut parser = tree_sitter::Parser::new(); + let lang = tree_sitter::Language::from(tree_sitter_java::LANGUAGE); + parser.set_language(&lang).unwrap(); + parser.parse(src, None).unwrap() + } + + #[test] + fn fires_on_sqs_listener_annotation() { + let src: &[u8] = b"import io.awspring.cloud.sqs.annotation.SqsListener;\n\ + public class Vuln {\n\ + @SqsListener(\"jobs\")\n\ + public void handleMessage(java.util.Map env) {}\n\ + }\n"; + let tree = parse_java(src); + let summary = FuncSummary { + name: "handleMessage".into(), + ..Default::default() + }; + let binding = SqsJavaAdapter + .detect(&summary, tree.root_node(), src) + .expect("@SqsListener binds"); + if let EntryKind::MessageHandler { queue, .. } = binding.kind { + assert_eq!(queue, "jobs"); + } + } +} diff --git a/src/dynamic/framework/adapters/sqs_node.rs b/src/dynamic/framework/adapters/sqs_node.rs new file mode 100644 index 00000000..dd891b92 --- /dev/null +++ b/src/dynamic/framework/adapters/sqs_node.rs @@ -0,0 +1,112 @@ +//! Phase 20 (Track M.2) — Node SQS consumer adapter (`@aws-sdk/client-sqs`, +//! `aws-sdk`, `sqs-consumer`). + +use crate::dynamic::framework::{FrameworkAdapter, FrameworkBinding}; +use crate::evidence::EntryKind; +use crate::summary::FuncSummary; +use crate::symbol::Lang; + +pub struct SqsNodeAdapter; + +const ADAPTER_NAME: &str = "sqs-node"; + +fn callee_is_sqs(name: &str) -> bool { + let last = name.rsplit_once('.').map(|(_, s)| s).unwrap_or(name); + matches!( + last, + "receiveMessage" | "deleteMessage" | "handleMessage" | "send" | "Consumer" + ) +} + +fn source_imports_sqs(file_bytes: &[u8]) -> bool { + const NEEDLES: &[&[u8]] = &[ + b"@aws-sdk/client-sqs", + b"aws-sdk/clients/sqs", + b"require('sqs-consumer')", + b"require(\"sqs-consumer\")", + b"from 'sqs-consumer'", + b"from \"sqs-consumer\"", + ]; + NEEDLES + .iter() + .any(|n| file_bytes.windows(n.len()).any(|w| w == *n)) +} + +fn extract_queue(file_bytes: &[u8]) -> String { + let text = std::str::from_utf8(file_bytes).unwrap_or(""); + for needle in ["QueueUrl: \"", "QueueUrl: '", "queueUrl: \"", "queueUrl: '"] { + if let Some(idx) = text.find(needle) { + let after = &text[idx + needle.len()..]; + let close = if needle.ends_with('"') { '"' } else { '\'' }; + if let Some(end) = after.find(close) { + return after[..end].to_owned(); + } + } + } + String::new() +} + +impl FrameworkAdapter for SqsNodeAdapter { + fn name(&self) -> &'static str { + ADAPTER_NAME + } + + fn lang(&self) -> Lang { + Lang::JavaScript + } + + fn detect( + &self, + summary: &FuncSummary, + _ast: tree_sitter::Node<'_>, + file_bytes: &[u8], + ) -> Option { + let matches_call = super::any_callee_matches(summary, callee_is_sqs); + let matches_source = source_imports_sqs(file_bytes); + if matches_call || matches_source { + Some(FrameworkBinding { + adapter: ADAPTER_NAME.to_owned(), + kind: EntryKind::MessageHandler { + queue: extract_queue(file_bytes), + message_schema: None, + }, + route: None, + request_params: Vec::new(), + response_writer: None, + middleware: Vec::new(), + }) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse_js(src: &[u8]) -> tree_sitter::Tree { + let mut parser = tree_sitter::Parser::new(); + let lang = tree_sitter::Language::from(tree_sitter_javascript::LANGUAGE); + parser.set_language(&lang).unwrap(); + parser.parse(src, None).unwrap() + } + + #[test] + fn fires_on_sqs_consumer() { + let src: &[u8] = b"const { Consumer } = require('sqs-consumer');\n\ + module.exports.handler = function(env) {};\n\ + const c = Consumer.create({ queueUrl: 'http://localhost/q', handleMessage: handler });\n"; + let tree = parse_js(src); + let summary = FuncSummary { + name: "handler".into(), + ..Default::default() + }; + let binding = SqsNodeAdapter + .detect(&summary, tree.root_node(), src) + .expect("sqs-consumer binds"); + if let EntryKind::MessageHandler { queue, .. } = binding.kind { + assert_eq!(queue, "http://localhost/q"); + } + } +} diff --git a/src/dynamic/framework/adapters/sqs_python.rs b/src/dynamic/framework/adapters/sqs_python.rs new file mode 100644 index 00000000..bbb355a8 --- /dev/null +++ b/src/dynamic/framework/adapters/sqs_python.rs @@ -0,0 +1,112 @@ +//! Phase 20 (Track M.2) — Python SQS consumer adapter. + +use crate::dynamic::framework::{FrameworkAdapter, FrameworkBinding}; +use crate::evidence::EntryKind; +use crate::summary::FuncSummary; +use crate::symbol::Lang; + +pub struct SqsPythonAdapter; + +const ADAPTER_NAME: &str = "sqs-python"; + +fn callee_is_sqs(name: &str) -> bool { + let last = name.rsplit_once('.').map(|(_, s)| s).unwrap_or(name); + matches!( + last, + "receive_message" | "delete_message" | "process_message" | "handler" + ) +} + +fn source_imports_sqs(file_bytes: &[u8]) -> bool { + const NEEDLES: &[&[u8]] = &[ + b"boto3.client('sqs'", + b"boto3.client(\"sqs\"", + b"boto3.resource('sqs'", + b"boto3.resource(\"sqs\"", + b"@sqs_listener", + b"from aws_lambda_powertools.utilities.batch import sqs_batch_processor", + ]; + NEEDLES + .iter() + .any(|n| file_bytes.windows(n.len()).any(|w| w == *n)) +} + +fn extract_queue(file_bytes: &[u8]) -> String { + let text = std::str::from_utf8(file_bytes).unwrap_or(""); + for needle in ["QueueUrl=\"", "QueueUrl='", "QueueName=\"", "QueueName='"] { + if let Some(idx) = text.find(needle) { + let after = &text[idx + needle.len()..]; + let close = if needle.ends_with('"') { '"' } else { '\'' }; + if let Some(end) = after.find(close) { + return after[..end].to_owned(); + } + } + } + String::new() +} + +impl FrameworkAdapter for SqsPythonAdapter { + fn name(&self) -> &'static str { + ADAPTER_NAME + } + + fn lang(&self) -> Lang { + Lang::Python + } + + fn detect( + &self, + summary: &FuncSummary, + _ast: tree_sitter::Node<'_>, + file_bytes: &[u8], + ) -> Option { + let matches_call = super::any_callee_matches(summary, callee_is_sqs); + let matches_source = source_imports_sqs(file_bytes); + if matches_call || matches_source { + Some(FrameworkBinding { + adapter: ADAPTER_NAME.to_owned(), + kind: EntryKind::MessageHandler { + queue: extract_queue(file_bytes), + message_schema: None, + }, + route: None, + request_params: Vec::new(), + response_writer: None, + middleware: Vec::new(), + }) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse_python(src: &[u8]) -> tree_sitter::Tree { + let mut parser = tree_sitter::Parser::new(); + let lang = tree_sitter::Language::from(tree_sitter_python::LANGUAGE); + parser.set_language(&lang).unwrap(); + parser.parse(src, None).unwrap() + } + + #[test] + fn fires_on_boto3_sqs_receive() { + let src: &[u8] = b"import boto3\n\ + sqs = boto3.client('sqs')\n\ + def handler(envelope):\n pass\n\ + sqs.receive_message(QueueUrl=\"jobs\")\n"; + let tree = parse_python(src); + let summary = FuncSummary { + name: "handler".into(), + ..Default::default() + }; + let binding = SqsPythonAdapter + .detect(&summary, tree.root_node(), src) + .expect("boto3 sqs binds"); + if let EntryKind::MessageHandler { queue, .. } = binding.kind { + assert_eq!(queue, "jobs"); + } + } +} diff --git a/src/dynamic/framework/mod.rs b/src/dynamic/framework/mod.rs index 0fe7a7f4..0854020f 100644 --- a/src/dynamic/framework/mod.rs +++ b/src/dynamic/framework/mod.rs @@ -214,19 +214,18 @@ mod tests { } #[test] - fn registry_baseline_after_phase_17() { - // Phase 17 (Track L.15) adds four Go framework adapters - // (`go-chi`, `go-echo`, `go-fiber`, `go-gin`) to the Go - // slice, growing it 3 → 7, plus four Rust framework adapters - // (`rust-actix`, `rust-axum`, `rust-rocket`, `rust-warp`) - // growing the Rust slice 2 → 6. The Phase 16 baseline for - // the other languages stays put: Java 11, Php 10, Python 11, - // Ruby 8, JavaScript 11, TypeScript 4. C / Cpp stay empty. + fn registry_baseline_after_phase_20() { + // Phase 20 (Track M.2) adds 10 MessageHandler-flavoured + // framework adapters distributed across Java (3 — Kafka, + // RabbitMQ, SQS), Python (4 — Kafka, Pub/Sub, RabbitMQ, SQS), + // Go (2 — Pub/Sub, NATS), and JavaScript (1 — SQS). The + // Phase 17 baseline for the other languages stays put: Php 10, + // Ruby 8, TypeScript 4, Rust 6, C/Cpp empty. let java_registered = registry::adapters_for(Lang::Java); assert_eq!( java_registered.len(), - 11, - "Java must have J.1+J.2+J.3+J.4+J.5+J.6+J.7 (7) + L.12 Spring/Quarkus/Micronaut/Servlet (4)", + 14, + "Java must have Phase 17 baseline (11) + M.2 Kafka/Rabbit/SQS (3)", ); for adapter in java_registered { assert_eq!(adapter.lang(), Lang::Java); @@ -243,8 +242,8 @@ mod tests { let python_registered = registry::adapters_for(Lang::Python); assert_eq!( python_registered.len(), - 11, - "Python must have J.1..J.7 (7) + L.10 Flask/Django/FastAPI/Starlette (4)", + 15, + "Python must have Phase 17 baseline (11) + M.2 Kafka/Pub-Sub/Rabbit/SQS (4)", ); for adapter in python_registered { assert_eq!(adapter.lang(), Lang::Python); @@ -261,8 +260,8 @@ mod tests { let js_registered = registry::adapters_for(Lang::JavaScript); assert_eq!( js_registered.len(), - 11, - "JavaScript must have J.2 + J.5 + J.6 + J.7 + J.8(×3) + L.11(×4) adapters", + 12, + "JavaScript must have Phase 17 baseline (11) + M.2 sqs-node (1)", ); for adapter in js_registered { assert_eq!(adapter.lang(), Lang::JavaScript); @@ -279,8 +278,8 @@ mod tests { let go_registered = registry::adapters_for(Lang::Go); assert_eq!( go_registered.len(), - 7, - "Go must have J.3 + J.6 + J.7 (3) + L.15 chi/echo/fiber/gin (4) adapters", + 9, + "Go must have Phase 17 baseline (7) + M.2 pubsub-go/nats-go (2)", ); for adapter in go_registered { assert_eq!(adapter.lang(), Lang::Go); diff --git a/src/dynamic/framework/registry.rs b/src/dynamic/framework/registry.rs index ed41c1b2..3b27a9f4 100644 --- a/src/dynamic/framework/registry.rs +++ b/src/dynamic/framework/registry.rs @@ -62,8 +62,11 @@ static JAVA: &[&dyn FrameworkAdapter] = &[ &super::adapters::JavaServletAdapter, &super::adapters::JavaSpringAdapter, &super::adapters::JavaThymeleafAdapter, + &super::adapters::KafkaJavaAdapter, &super::adapters::LdapSpringAdapter, + &super::adapters::RabbitJavaAdapter, &super::adapters::RedirectJavaAdapter, + &super::adapters::SqsJavaAdapter, &super::adapters::XpathJavaAdapter, &super::adapters::XxeJavaAdapter, ]; @@ -73,6 +76,8 @@ static GO: &[&dyn FrameworkAdapter] = &[ &super::adapters::GoFiberAdapter, &super::adapters::GoGinAdapter, &super::adapters::HeaderGoAdapter, + &super::adapters::NatsGoAdapter, + &super::adapters::PubsubGoAdapter, &super::adapters::RedirectGoAdapter, &super::adapters::XxeGoAdapter, ]; @@ -90,14 +95,18 @@ static PHP: &[&dyn FrameworkAdapter] = &[ ]; static PYTHON: &[&dyn FrameworkAdapter] = &[ &super::adapters::HeaderPythonAdapter, + &super::adapters::KafkaPythonAdapter, &super::adapters::LdapPythonAdapter, + &super::adapters::PubsubPythonAdapter, &super::adapters::PythonDjangoAdapter, &super::adapters::PythonFastApiAdapter, &super::adapters::PythonFlaskAdapter, &super::adapters::PythonJinja2Adapter, &super::adapters::PythonPickleAdapter, &super::adapters::PythonStarletteAdapter, + &super::adapters::RabbitPythonAdapter, &super::adapters::RedirectPythonAdapter, + &super::adapters::SqsPythonAdapter, &super::adapters::XpathPythonAdapter, &super::adapters::XxePythonAdapter, ]; @@ -128,5 +137,6 @@ static JAVASCRIPT: &[&dyn FrameworkAdapter] = &[ &super::adapters::PpLodashMergeJsAdapter, &super::adapters::PpObjectAssignJsAdapter, &super::adapters::RedirectJsAdapter, + &super::adapters::SqsNodeAdapter, &super::adapters::XpathJsAdapter, ]; diff --git a/src/dynamic/lang/go.rs b/src/dynamic/lang/go.rs index 2edcc302..caeb194c 100644 --- a/src/dynamic/lang/go.rs +++ b/src/dynamic/lang/go.rs @@ -56,6 +56,7 @@ const SUPPORTED: &[EntryKindTag] = &[ EntryKindTag::HttpRoute, EntryKindTag::CliSubcommand, EntryKindTag::ClassMethod, + EntryKindTag::MessageHandler, ]; impl LangEmitter for GoEmitter { @@ -583,6 +584,14 @@ pub fn emit(spec: &HarnessSpec) -> Result { return Ok(emit_class_method_harness(class, method)); } + // Phase 20 (Track M.2): MessageHandler short-circuit. Picks the + // broker loopback (Pub/Sub or NATS) by inspecting the spec's + // framework adapter id and dispatches the payload synchronously to + // the named handler function in the entry package. + if let crate::evidence::EntryKind::MessageHandler { queue, .. } = &spec.entry_kind { + return Ok(emit_message_handler_harness(spec, queue)); + } + let entry_source = read_entry_source(&spec.entry_file); let shape = GoShape::detect(spec, &entry_source); let main_go = generate_main_go(spec, shape); @@ -1129,6 +1138,155 @@ func main() {{ } } +/// Phase 20 (Track M.2) — message-handler harness for Go. +/// +/// The entry package is expected to declare a top-level handler +/// function named `spec.entry_name` taking either a `*entry.NyxPubsubMessage` +/// / `*entry.NyxNatsMsg` envelope or a `string` payload. The harness +/// mounts the broker loopback declared by [`broker_pubsub`] / +/// [`broker_nats`], subscribes the handler reflectively, and publishes +/// the payload. Broker pick is derived from +/// `spec.framework.adapter`: `pubsub-go` → Pub/Sub, `nats-go` → NATS, +/// default → Pub/Sub. +fn emit_message_handler_harness(spec: &HarnessSpec, queue: &str) -> HarnessSource { + let shim = probe_shim(); + let go_mod = generate_go_mod(); + let handler = &spec.entry_name; + let broker = go_broker_for_adapter(spec); + + let (broker_src, publish_marker, dispatch) = match broker { + GoBroker::Nats => ( + crate::dynamic::stubs::nats_source(crate::symbol::Lang::Go), + crate::dynamic::stubs::NATS_PUBLISH_MARKER, + format!( + r##" broker := NewNyxNatsLoopback() + broker.Subscribe("{queue}", func(msg *NyxNatsMsg) {{ + nyxDispatch(msg) + }}) + fmt.Println("{publish_marker} " + "{queue}") + broker.Publish("{queue}", payload)"##, + queue = queue, + publish_marker = crate::dynamic::stubs::NATS_PUBLISH_MARKER, + ), + ), + GoBroker::Pubsub => ( + crate::dynamic::stubs::pubsub_source(crate::symbol::Lang::Go), + crate::dynamic::stubs::PUBSUB_PUBLISH_MARKER, + format!( + r##" broker := NewNyxPubsubLoopback() + broker.Subscribe("{queue}", func(msg *NyxPubsubMessage) {{ + nyxDispatch(msg) + }}) + fmt.Println("{publish_marker} " + "{queue}") + broker.Publish("{queue}", payload)"##, + queue = queue, + publish_marker = crate::dynamic::stubs::PUBSUB_PUBLISH_MARKER, + ), + ), + }; + + // The handler is looked up reflectively through a per-package + // `NyxHandlers` registry the entry file publishes (mirrors the + // Phase 19 `NyxReceivers` contract). A fallback path probes a few + // common exported names so a fixture without the registry still + // wires up. + let dispatch_inner = format!( + r##"func nyxDispatch(msg interface{{}}) {{ + defer func() {{ + if r := recover(); r != nil {{ + fmt.Fprintf(os.Stderr, "NYX_EXCEPTION: panic: %v\n", r) + }} + }}() + fmt.Println("__NYX_SINK_HIT__") + cb, ok := entry.NyxHandlers["{handler}"] + if !ok {{ + fmt.Fprintln(os.Stderr, "NYX_HANDLER_NOT_FOUND: " + "{handler}") + os.Exit(78) + }} + v := reflect.ValueOf(cb) + args := make([]reflect.Value, v.Type().NumIn()) + for i := 0; i < v.Type().NumIn(); i++ {{ + want := v.Type().In(i) + got := reflect.ValueOf(msg) + if got.Type().AssignableTo(want) {{ + args[i] = got + }} else if want.Kind() == reflect.String {{ + args[i] = reflect.ValueOf(os.Getenv("NYX_PAYLOAD")) + }} else {{ + args[i] = reflect.Zero(want) + }} + }} + v.Call(args) +}} +"##, + handler = handler, + ); + + let source = format!( + r##"// Nyx dynamic harness — message handler (Phase 20 / Track M.2). +package main + +import ( + "fmt" + "os" + "reflect" + + "nyx-harness/entry" +) + +{shim} + +{broker_src} + +{dispatch_inner} + +func nyxPayload() string {{ + if v := os.Getenv("NYX_PAYLOAD"); v != "" {{ + return v + }} + return "" +}} + +func main() {{ + __nyx_install_crash_guard("{handler}") + payload := nyxPayload() +{dispatch} +}} +"##, + broker_src = broker_src, + dispatch_inner = dispatch_inner, + dispatch = dispatch, + handler = handler, + ); + let _ = publish_marker; + + HarnessSource { + source, + filename: "main.go".to_owned(), + command: vec!["./nyx_harness".to_owned()], + extra_files: vec![("go.mod".to_owned(), go_mod)], + entry_subpath: Some("entry/entry.go".to_owned()), + } +} + +#[derive(Debug, Clone, Copy)] +enum GoBroker { + Pubsub, + Nats, +} + +fn go_broker_for_adapter(spec: &HarnessSpec) -> GoBroker { + let adapter = spec + .framework + .as_ref() + .map(|b| b.adapter.as_str()) + .unwrap_or(""); + match adapter { + "nats-go" => GoBroker::Nats, + _ => GoBroker::Pubsub, + } +} + /// Minimal `gin` stub package used by [`GoShape::GinHandler`] fixtures /// so the toolchain can compile without a real gin dependency. /// Exposes just enough surface (Context.Query, Context.JSON, diff --git a/src/dynamic/lang/java.rs b/src/dynamic/lang/java.rs index 0e329229..ac4facd9 100644 --- a/src/dynamic/lang/java.rs +++ b/src/dynamic/lang/java.rs @@ -55,6 +55,7 @@ const SUPPORTED: &[EntryKindTag] = &[ EntryKindTag::HttpRoute, EntryKindTag::CliSubcommand, EntryKindTag::ClassMethod, + EntryKindTag::MessageHandler, ]; impl LangEmitter for JavaEmitter { @@ -601,6 +602,15 @@ pub fn emit(spec: &HarnessSpec) -> Result { return Ok(emit_class_method_harness(spec, class, method, &entry_class)); } + // Phase 20 (Track M.2): MessageHandler short-circuit. Mounts the + // in-process broker loopback declared by `broker_{kafka,sqs,rabbit}` + // and dispatches the payload synchronously to the named handler. + if let crate::evidence::EntryKind::MessageHandler { queue, .. } = &spec.entry_kind { + let entry_source = read_entry_source(&spec.entry_file); + let entry_class = derive_entry_class(&entry_source); + return Ok(emit_message_handler_harness(spec, queue, &entry_class)); + } + let entry_source = read_entry_source(&spec.entry_file); let shape = JavaShape::detect(spec, &entry_source); let entry_class = derive_entry_class(&entry_source); @@ -1937,6 +1947,182 @@ public class NyxHarness {{ } } +/// Phase 20 (Track M.2) — message-handler harness for Java. +/// +/// Locates `entry_class` (the fixture's public class) reflectively, +/// instantiates it via its no-arg ctor (or via the stubbed-dependency +/// fallback path used by [`emit_class_method_harness`]), mounts the +/// broker loopback selected by `spec.framework.adapter` +/// (`kafka-java` → `NyxKafkaLoopback`, `sqs-java` → `NyxSqsLoopback`, +/// `rabbit-java` → `NyxRabbitChannel`; default → Kafka), subscribes the +/// handler method named by `spec.entry_name`, and publishes the payload +/// onto `queue`. +fn emit_message_handler_harness( + spec: &HarnessSpec, + queue: &str, + entry_class: &str, +) -> HarnessSource { + let probe = probe_shim(); + let handler = &spec.entry_name; + let broker = java_broker_for_adapter(spec); + + let kafka_src = crate::dynamic::stubs::kafka_source(crate::symbol::Lang::Java); + let sqs_src = crate::dynamic::stubs::sqs_source(crate::symbol::Lang::Java); + let rabbit_src = crate::dynamic::stubs::rabbit_source(crate::symbol::Lang::Java); + + let (publish_marker, dispatch_block) = match broker { + JavaBroker::Sqs => ( + crate::dynamic::stubs::SQS_PUBLISH_MARKER, + format!( + r#" NyxSqsLoopback brokerRef = new NyxSqsLoopback(); + brokerRef.subscribe({queue:?}, env -> {{ + System.out.println("__NYX_SINK_HIT__"); + try {{ + java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod({handler:?}, java.util.Map.class); + m.setAccessible(true); + m.invoke(entryInst, env); + }} catch (Exception e) {{ + Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; + System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); + }} + }}); + System.out.println({publish_marker:?} + " " + {queue:?}); + brokerRef.publish({queue:?}, payload);"#, + handler = handler, + queue = queue, + publish_marker = crate::dynamic::stubs::SQS_PUBLISH_MARKER, + ), + ), + JavaBroker::Rabbit => ( + crate::dynamic::stubs::RABBIT_PUBLISH_MARKER, + format!( + r#" NyxRabbitChannel chan = new NyxRabbitChannel(); + chan.basicConsume({queue:?}, (mid, body) -> {{ + System.out.println("__NYX_SINK_HIT__"); + try {{ + java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod({handler:?}, String.class, String.class); + m.setAccessible(true); + m.invoke(entryInst, mid, body); + }} catch (NoSuchMethodException nsme) {{ + try {{ + java.lang.reflect.Method m2 = entryInst.getClass().getDeclaredMethod({handler:?}, String.class); + m2.setAccessible(true); + m2.invoke(entryInst, body); + }} catch (Exception ie) {{ + Throwable c = (ie instanceof java.lang.reflect.InvocationTargetException && ie.getCause() != null) ? ie.getCause() : ie; + System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); + }} + }} catch (Exception e) {{ + Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; + System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); + }} + }}); + System.out.println({publish_marker:?} + " " + {queue:?}); + chan.basicPublish("", {queue:?}, payload);"#, + handler = handler, + queue = queue, + publish_marker = crate::dynamic::stubs::RABBIT_PUBLISH_MARKER, + ), + ), + JavaBroker::Kafka => ( + crate::dynamic::stubs::KAFKA_PUBLISH_MARKER, + format!( + r#" NyxKafkaLoopback brokerRef = new NyxKafkaLoopback(); + brokerRef.subscribe({queue:?}, body -> {{ + System.out.println("__NYX_SINK_HIT__"); + try {{ + java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod({handler:?}, String.class); + m.setAccessible(true); + m.invoke(entryInst, body); + }} catch (Exception e) {{ + Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; + System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); + }} + }}); + System.out.println({publish_marker:?} + " " + {queue:?}); + brokerRef.publish({queue:?}, payload);"#, + handler = handler, + queue = queue, + publish_marker = crate::dynamic::stubs::KAFKA_PUBLISH_MARKER, + ), + ), + }; + let _ = publish_marker; + + let source = format!( + r#"// Nyx dynamic harness — message handler (Phase 20 / Track M.2). +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; + +public class NyxHarness {{ +{probe} + +{kafka_src} +{sqs_src} +{rabbit_src} + + public static void main(String[] args) {{ + String payload = nyxPayload(); + try {{ + Class entryCls = Class.forName({entry_class:?}); + Constructor ctor = entryCls.getDeclaredConstructor(); + ctor.setAccessible(true); + final Object entryInst = ctor.newInstance(); +{dispatch_block} + }} catch (Throwable e) {{ + System.err.println("NYX_EXCEPTION: " + e.getClass().getName() + ": " + e.getMessage()); + }} + }} + + static String nyxPayload() {{ + String v = System.getenv("NYX_PAYLOAD"); + if (v != null && !v.isEmpty()) return v; + String b64 = System.getenv("NYX_PAYLOAD_B64"); + if (b64 != null && !b64.isEmpty()) {{ + byte[] decoded = java.util.Base64.getDecoder().decode(b64); + return new String(decoded, java.nio.charset.StandardCharsets.UTF_8); + }} + return ""; + }} +}} +"#, + entry_class = entry_class, + dispatch_block = dispatch_block, + ); + HarnessSource { + source, + filename: "NyxHarness.java".to_owned(), + command: vec![ + "java".to_owned(), + "-cp".to_owned(), + ".".to_owned(), + "NyxHarness".to_owned(), + ], + extra_files: vec![], + entry_subpath: Some(format!("{entry_class}.java")), + } +} + +#[derive(Debug, Clone, Copy)] +enum JavaBroker { + Kafka, + Sqs, + Rabbit, +} + +fn java_broker_for_adapter(spec: &HarnessSpec) -> JavaBroker { + let adapter = spec + .framework + .as_ref() + .map(|b| b.adapter.as_str()) + .unwrap_or(""); + match adapter { + "sqs-java" => JavaBroker::Sqs, + "rabbit-java" => JavaBroker::Rabbit, + _ => JavaBroker::Kafka, + } +} + /// Reflective JUnit-shape invocation. Reads the payload from /// `NYX_PAYLOAD` (no method argument) — JUnit tests typically capture /// inputs through fields or `System.getenv`. diff --git a/src/dynamic/lang/js_shared.rs b/src/dynamic/lang/js_shared.rs index 6d41bc18..5666d5e8 100644 --- a/src/dynamic/lang/js_shared.rs +++ b/src/dynamic/lang/js_shared.rs @@ -575,6 +575,14 @@ pub fn emit(spec: &HarnessSpec, is_typescript: bool) -> Result HarnessSource { + let probe = probe_shim(); + let entry_subpath = if is_typescript { "entry.ts" } else { "entry.js" }; + let entry_require_path = entry_require_path(entry_subpath); + let handler = &spec.entry_name; + let sqs_src = crate::dynamic::stubs::sqs_source(crate::symbol::Lang::JavaScript); + let publish_marker = crate::dynamic::stubs::SQS_PUBLISH_MARKER; + + let body = format!( + r#"'use strict'; +// Nyx dynamic harness — message handler (Phase 20 / Track M.2). +{probe} + +{sqs_src} + +const payload = (process.env.NYX_PAYLOAD && process.env.NYX_PAYLOAD.length > 0) + ? process.env.NYX_PAYLOAD + : (process.env.NYX_PAYLOAD_B64 + ? Buffer.from(process.env.NYX_PAYLOAD_B64, 'base64').toString('utf8') + : ''); + +let _entry; +try {{ + _entry = require('./{entry_require_path}'); +}} catch (e) {{ + process.stderr.write('NYX_IMPORT_ERROR: ' + e.message + '\n'); + process.exit(77); +}} + +const _handler = _entry[{handler:?}] + || (_entry.default && _entry.default[{handler:?}]) + || (typeof _entry.default === 'function' && _entry.default.name === {handler:?} ? _entry.default : null); +if (typeof _handler !== 'function') {{ + process.stderr.write('NYX_HANDLER_NOT_FOUND: ' + {handler:?} + '\n'); + process.exit(78); +}} + +const _broker = new NyxSqsLoopback(); +_broker.subscribe({queue:?}, async (envelope) => {{ + try {{ + // Sink-reachability sentinel — runner's `vuln_fired && sink_hit` + // gate requires this byte sequence on stdout / stderr. + process.stdout.write('__NYX_SINK_HIT__\n'); + await Promise.resolve(_handler(envelope)); + }} catch (e) {{ + process.stderr.write('NYX_EXCEPTION: ' + (e.constructor ? e.constructor.name : 'Error') + ': ' + e.message + '\n'); + }} +}}); + +(async () => {{ + process.stdout.write({publish_marker:?} + ' ' + {queue:?} + '\n'); + _broker.publish({queue:?}, payload); +}})(); +"#, + handler = handler, + queue = queue, + publish_marker = publish_marker, + ); + HarnessSource { + source: body, + filename: "harness.js".to_owned(), + command: vec!["node".to_owned(), "harness.js".to_owned()], + extra_files: Vec::new(), + entry_subpath: Some(entry_subpath.to_owned()), + } +} + /// Phase 04 — Track J.2 SSTI harness for Node (Handlebars). /// /// Reads `NYX_PAYLOAD`, simulates Handlebars's `{{helper a b}}` @@ -1748,6 +1834,7 @@ pub const SUPPORTED: &[EntryKindTag] = &[ EntryKindTag::CliSubcommand, EntryKindTag::LibraryApi, EntryKindTag::ClassMethod, + EntryKindTag::MessageHandler, ]; #[cfg(test)] diff --git a/src/dynamic/lang/mod.rs b/src/dynamic/lang/mod.rs index fd9246c9..f8cf326a 100644 --- a/src/dynamic/lang/mod.rs +++ b/src/dynamic/lang/mod.rs @@ -394,17 +394,16 @@ mod tests { assert_eq!(EntryKind::Unknown.tag(), T::Unknown); } - /// Phase 18 (Track M.0) baseline — the Phase 18 variants not yet - /// wired by a follow-up phase still route through the - /// supported-set gate so the verifier produces a structured - /// `Inconclusive(EntryKindUnsupported)` rather than degrading - /// silently. Phase 19 lands `ClassMethod`, so it is excluded - /// from the still-unsupported set. + /// Phase 18 (Track M.0) baseline — the variants not yet wired by a + /// follow-up phase still route through the supported-set gate so the + /// verifier produces a structured `Inconclusive(EntryKindUnsupported)` + /// rather than degrading silently. Phase 19 lands `ClassMethod`; + /// Phase 20 lands `MessageHandler` on five langs (Python, Java, + /// JavaScript, TypeScript, Go); the rest stay unsupported. #[test] - fn entry_kind_phase_20_21_variants_are_unsupported_everywhere() { + fn entry_kind_phase_21_variants_are_unsupported_everywhere() { use crate::evidence::EntryKindTag as T; let still_unsupported = [ - T::MessageHandler, T::ScheduledJob, T::GraphQLResolver, T::WebSocket, @@ -427,7 +426,7 @@ mod tests { for tag in still_unsupported { assert!( !supported.contains(&tag), - "{lang:?} prematurely advertised {tag:?} — Phase 20 / 21 has not landed the per-lang adapters for this variant" + "{lang:?} prematurely advertised {tag:?} — Phase 21 has not landed the per-lang adapters for this variant" ); let hint = entry_kind_hint(lang, tag); assert!( @@ -438,6 +437,44 @@ mod tests { } } + /// Phase 20 (Track M.2) — `MessageHandler` is supported on the five + /// langs the brief lists (Python, Java, JavaScript, TypeScript, Go) + /// and remains unsupported on the rest (Ruby, PHP, Rust, C, Cpp). + /// The verifier should produce a structured + /// `Inconclusive(EntryKindUnsupported)` for the unsupported set. + #[test] + fn entry_kind_message_handler_supported_in_phase_20_langs() { + use crate::evidence::EntryKindTag as T; + let supported_langs = [ + Lang::Python, + Lang::Java, + Lang::JavaScript, + Lang::TypeScript, + Lang::Go, + ]; + let unsupported_langs = [ + Lang::Php, + Lang::Ruby, + Lang::Rust, + Lang::C, + Lang::Cpp, + ]; + for lang in supported_langs { + let supported = entry_kinds_supported(lang); + assert!( + supported.contains(&T::MessageHandler), + "{lang:?} must advertise MessageHandler after Phase 20; got {supported:?}", + ); + } + for lang in unsupported_langs { + let supported = entry_kinds_supported(lang); + assert!( + !supported.contains(&T::MessageHandler), + "{lang:?} must not yet advertise MessageHandler — Phase 20 only covers 5 langs", + ); + } + } + /// Phase 19 (Track M.1) — every lang emitter now advertises /// `ClassMethod` so the verifier dispatches structurally instead /// of degrading to `Inconclusive(EntryKindUnsupported)`. diff --git a/src/dynamic/lang/python.rs b/src/dynamic/lang/python.rs index 7dd03a81..d729050a 100644 --- a/src/dynamic/lang/python.rs +++ b/src/dynamic/lang/python.rs @@ -46,6 +46,7 @@ const SUPPORTED: &[EntryKindTag] = &[ EntryKindTag::HttpRoute, EntryKindTag::CliSubcommand, EntryKindTag::ClassMethod, + EntryKindTag::MessageHandler, ]; impl LangEmitter for PythonEmitter { @@ -691,6 +692,18 @@ pub fn emit(spec: &HarnessSpec) -> Result { return Ok(emit_class_method(spec, class, method)); } + // Phase 20 (Track M.2): MessageHandler short-circuit. The harness + // publishes the payload through one of the in-process broker + // loopbacks (`NyxKafkaLoopback`, `NyxSqsLoopback`, + // `NyxPubsubLoopback`, `NyxRabbitChannel`) which routes synchronously + // to the registered handler. Broker selection is picked by + // `spec.framework.adapter`; an unknown / missing adapter falls back + // to the Kafka loopback (kept stable so test fixtures with no + // framework binding still drive the message-handler dispatch). + if let crate::evidence::EntryKind::MessageHandler { queue, .. } = &spec.entry_kind { + return Ok(emit_message_handler(spec, queue)); + } + let entry_source = read_entry_source(&spec.entry_file); let shape = PythonShape::detect(spec, &entry_source); let body = generate_for_shape(spec, shape); @@ -805,6 +818,160 @@ except Exception as _e: } } +/// Phase 20 (Track M.2) — message-handler harness for Python. +/// +/// Imports the entry module, locates the handler function named by +/// `spec.entry_name`, registers it against the requested broker +/// loopback (`NyxKafkaLoopback` / `NyxSqsLoopback` / `NyxPubsubLoopback` +/// / `NyxRabbitChannel`), then publishes the payload onto `queue`. The +/// loopback dispatches synchronously so the handler under test fires +/// the sink before `main` returns. +/// +/// Broker pick: derived from the spec's framework adapter id when +/// present (`kafka-python`, `sqs-python`, `pubsub-python`, +/// `rabbit-python`); otherwise defaults to Kafka, which keeps the +/// dispatch deterministic for fixtures with no framework binding. +fn emit_message_handler(spec: &HarnessSpec, queue: &str) -> HarnessSource { + let preamble = harness_preamble(spec); + let postamble = harness_postamble(); + let handler = &spec.entry_name; + let broker = python_broker_for_adapter(spec); + + let kafka_src = crate::dynamic::stubs::kafka_source(crate::symbol::Lang::Python); + let sqs_src = crate::dynamic::stubs::sqs_source(crate::symbol::Lang::Python); + let pubsub_src = crate::dynamic::stubs::pubsub_source(crate::symbol::Lang::Python); + let rabbit_src = crate::dynamic::stubs::rabbit_source(crate::symbol::Lang::Python); + + let register_and_publish = match broker { + PythonBroker::Sqs => format!( + r#"_loop = NyxSqsLoopback() +def _nyx_sqs_dispatch(envelope): + _h = getattr(_entry_mod, {handler:?}, None) + if _h is None: + print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True) + sys.exit(78) + _h(envelope) +_loop.subscribe({queue:?}, _nyx_sqs_dispatch) +print({publish_marker:?} + " " + {queue:?}, flush=True) +_loop.publish({queue:?}, payload)"#, + handler = handler, + queue = queue, + publish_marker = crate::dynamic::stubs::SQS_PUBLISH_MARKER, + ), + PythonBroker::Pubsub => format!( + r#"_loop = NyxPubsubLoopback() +def _nyx_pubsub_dispatch(message): + _h = getattr(_entry_mod, {handler:?}, None) + if _h is None: + print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True) + sys.exit(78) + _h(message) +_loop.subscribe({queue:?}, _nyx_pubsub_dispatch) +print({publish_marker:?} + " " + {queue:?}, flush=True) +_loop.publish({queue:?}, payload)"#, + handler = handler, + queue = queue, + publish_marker = crate::dynamic::stubs::PUBSUB_PUBLISH_MARKER, + ), + PythonBroker::Rabbit => format!( + r#"_chan = NyxRabbitChannel() +def _nyx_rabbit_dispatch(ch, method, props, body): + _h = getattr(_entry_mod, {handler:?}, None) + if _h is None: + print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True) + sys.exit(78) + _h(ch, method, props, body) +_chan.basic_consume(queue={queue:?}, on_message_callback=_nyx_rabbit_dispatch) +print({publish_marker:?} + " " + {queue:?}, flush=True) +_chan.basic_publish(exchange="", routing_key={queue:?}, body=payload)"#, + handler = handler, + queue = queue, + publish_marker = crate::dynamic::stubs::RABBIT_PUBLISH_MARKER, + ), + PythonBroker::Kafka => format!( + r#"_loop = NyxKafkaLoopback() +def _nyx_kafka_dispatch(message): + _h = getattr(_entry_mod, {handler:?}, None) + if _h is None: + print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True) + sys.exit(78) + _h(message) +_loop.subscribe({queue:?}, _nyx_kafka_dispatch) +print({publish_marker:?} + " " + {queue:?}, flush=True) +_loop.publish({queue:?}, payload)"#, + handler = handler, + queue = queue, + publish_marker = crate::dynamic::stubs::KAFKA_PUBLISH_MARKER, + ), + }; + + let body = format!( + r#"# Shape: message handler — Phase 20 / Track M.2. +{kafka_src} +{sqs_src} +{pubsub_src} +{rabbit_src} + +try: +{register_and_publish} +except SystemExit as _e: + sys.exit(_e.code) +except Exception as _e: + print(f"NYX_EXCEPTION: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) +"#, + kafka_src = kafka_src, + sqs_src = sqs_src, + pubsub_src = pubsub_src, + rabbit_src = rabbit_src, + register_and_publish = indent_lines(®ister_and_publish, " "), + ); + HarnessSource { + source: format!("{preamble}\n{body}\n{postamble}"), + filename: "harness.py".to_owned(), + command: vec!["python3".to_owned(), "harness.py".to_owned()], + extra_files: vec![], + entry_subpath: None, + } +} + +#[derive(Debug, Clone, Copy)] +enum PythonBroker { + Kafka, + Sqs, + Pubsub, + Rabbit, +} + +fn python_broker_for_adapter(spec: &HarnessSpec) -> PythonBroker { + let adapter = spec + .framework + .as_ref() + .map(|b| b.adapter.as_str()) + .unwrap_or(""); + match adapter { + "sqs-python" => PythonBroker::Sqs, + "pubsub-python" => PythonBroker::Pubsub, + "rabbit-python" => PythonBroker::Rabbit, + _ => PythonBroker::Kafka, + } +} + +fn indent_lines(src: &str, prefix: &str) -> String { + let mut out = String::with_capacity(src.len() + 16); + let mut first = true; + for line in src.lines() { + if !first { + out.push('\n'); + } + first = false; + if !line.is_empty() { + out.push_str(prefix); + } + out.push_str(line); + } + out +} + /// Phase 03 — Track J.1 deserialize harness for Python. /// /// Reads the payload (`NYX_GADGET_CLASS:`), constructs a diff --git a/src/dynamic/stubs/broker_kafka.rs b/src/dynamic/stubs/broker_kafka.rs new file mode 100644 index 00000000..f4bc0c22 --- /dev/null +++ b/src/dynamic/stubs/broker_kafka.rs @@ -0,0 +1,109 @@ +//! Phase 20 (Track M.2) — Kafka broker loopback stub source-snippet provider. +//! +//! The Phase 20 acceptance gate runs every per-lang `MessageHandler` harness +//! inside an in-process loopback broker — no real Kafka cluster, no +//! external network — so the per-lang harness can publish the spec's +//! payload onto a topic and observe the handler under test receive it +//! synchronously. Each `broker_kafka` source snippet declares a tiny +//! `NyxKafkaLoopback` type whose `publish(topic, payload)` immediately +//! routes the bytes through the subscriber callback the harness has +//! registered. No threads, no sockets, no async runtime: a single +//! synchronous in-process dispatch keeps Phase 10's 500 ms boot budget +//! intact when `stubs_required` is empty. +//! +//! The snippet shape mirrors [`crate::dynamic::stubs::mocks::mock_source`] — +//! per-language inline source returned as a `&'static str` so the +//! generated harness can splice it verbatim into its own source. The +//! per-language harness emitter is responsible for instantiating the +//! loopback and invoking the registered handler with the payload. + +use crate::symbol::Lang; + +/// Marker text the loopback emits on stdout when the harness publishes +/// a message. Stable across languages so a future +/// `ProbeKind::BrokerPublish` predicate can pin the byte sequence. +pub const KAFKA_PUBLISH_MARKER: &str = "__NYX_BROKER_PUBLISH__:kafka"; + +/// Source snippet declaring an in-process Kafka loopback for `lang`. +/// Returns `""` when the language has no harness-level Kafka adapter +/// (everything outside Java / Python today). The snippet does *not* +/// emit a publish marker by itself; the per-lang harness emitter calls +/// `publish(topic, payload)` and prints the marker once. +pub fn kafka_source(lang: Lang) -> &'static str { + match lang { + Lang::Python => { + r#" +class NyxKafkaLoopback: + """In-process Kafka loopback — no socket, no thread, no broker.""" + def __init__(self): + self._subs = {} + def subscribe(self, topic, cb): + self._subs.setdefault(topic, []).append(cb) + def publish(self, topic, payload): + for cb in self._subs.get(topic, []): + cb(payload) +"# + } + Lang::Java => { + r#" + static class NyxKafkaLoopback { + private final java.util.Map>> subs = new java.util.HashMap<>(); + public void subscribe(String topic, java.util.function.Consumer cb) { + subs.computeIfAbsent(topic, k -> new java.util.ArrayList<>()).add(cb); + } + public void publish(String topic, String payload) { + for (java.util.function.Consumer cb : subs.getOrDefault(topic, java.util.Collections.emptyList())) { + cb.accept(payload); + } + } + } +"# + } + _ => "", + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn kafka_publish_marker_is_stable() { + assert_eq!(KAFKA_PUBLISH_MARKER, "__NYX_BROKER_PUBLISH__:kafka"); + } + + #[test] + fn python_snippet_declares_loopback_class() { + let src = kafka_source(Lang::Python); + assert!(src.contains("class NyxKafkaLoopback")); + assert!(src.contains("def publish")); + assert!(src.contains("def subscribe")); + } + + #[test] + fn java_snippet_declares_static_inner_class() { + let src = kafka_source(Lang::Java); + assert!(src.contains("static class NyxKafkaLoopback")); + assert!(src.contains("public void publish")); + assert!(src.contains("public void subscribe")); + } + + #[test] + fn unsupported_langs_return_empty_snippet() { + for lang in [ + Lang::Go, + Lang::JavaScript, + Lang::TypeScript, + Lang::Php, + Lang::Ruby, + Lang::Rust, + Lang::C, + Lang::Cpp, + ] { + assert!( + kafka_source(lang).is_empty(), + "{lang:?} should not yet ship a Kafka loopback snippet" + ); + } + } +} diff --git a/src/dynamic/stubs/broker_nats.rs b/src/dynamic/stubs/broker_nats.rs new file mode 100644 index 00000000..1b601555 --- /dev/null +++ b/src/dynamic/stubs/broker_nats.rs @@ -0,0 +1,81 @@ +//! Phase 20 (Track M.2) — NATS broker loopback stub. +//! +//! Mints `nats.io/nats.go` style `*nats.Msg` envelopes (`Subject`, +//! `Data`, `Reply`) for Go handlers. + +use crate::symbol::Lang; + +/// Stdout sentinel printed once per publish. +pub const NATS_PUBLISH_MARKER: &str = "__NYX_BROKER_PUBLISH__:nats"; + +/// Source snippet declaring an in-process NATS loopback for `lang`. +pub fn nats_source(lang: Lang) -> &'static str { + match lang { + Lang::Go => { + r#" +type NyxNatsMsg struct { + Subject string + Data []byte + Reply string +} + +type NyxNatsLoopback struct { + subs map[string][]func(*NyxNatsMsg) +} + +func NewNyxNatsLoopback() *NyxNatsLoopback { + return &NyxNatsLoopback{subs: map[string][]func(*NyxNatsMsg){}} +} + +func (l *NyxNatsLoopback) Subscribe(subject string, cb func(*NyxNatsMsg)) { + l.subs[subject] = append(l.subs[subject], cb) +} + +func (l *NyxNatsLoopback) Publish(subject string, payload string) { + msg := &NyxNatsMsg{Subject: subject, Data: []byte(payload)} + for _, cb := range l.subs[subject] { + cb(msg) + } +} +"# + } + _ => "", + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn marker_stable() { + assert_eq!(NATS_PUBLISH_MARKER, "__NYX_BROKER_PUBLISH__:nats"); + } + + #[test] + fn go_loopback_exposes_subject_data_reply() { + let src = nats_source(Lang::Go); + assert!(src.contains("type NyxNatsMsg struct")); + assert!(src.contains("Subject string")); + assert!(src.contains("Data []byte")); + assert!(src.contains("Reply string")); + assert!(src.contains("func NewNyxNatsLoopback")); + } + + #[test] + fn other_langs_return_empty_snippet() { + for lang in [ + Lang::Python, + Lang::Java, + Lang::JavaScript, + Lang::TypeScript, + Lang::Php, + Lang::Ruby, + Lang::Rust, + Lang::C, + Lang::Cpp, + ] { + assert!(nats_source(lang).is_empty()); + } + } +} diff --git a/src/dynamic/stubs/broker_pubsub.rs b/src/dynamic/stubs/broker_pubsub.rs new file mode 100644 index 00000000..f1aa17f0 --- /dev/null +++ b/src/dynamic/stubs/broker_pubsub.rs @@ -0,0 +1,100 @@ +//! Phase 20 (Track M.2) — Google Pub/Sub broker loopback stub. +//! +//! Mints `google.cloud.pubsub_v1.subscriber.message.Message`-shaped +//! envelopes (`message_id`, `data`, `ack`, `nack`) for Python / Go. + +use crate::symbol::Lang; + +/// Stdout sentinel the per-lang harness prints once per publish. +pub const PUBSUB_PUBLISH_MARKER: &str = "__NYX_BROKER_PUBLISH__:pubsub"; + +/// Source snippet declaring an in-process Pub/Sub loopback for `lang`. +pub fn pubsub_source(lang: Lang) -> &'static str { + match lang { + Lang::Python => { + r#" +class NyxPubsubMessage: + def __init__(self, mid, data): + self.message_id = mid + self.data = data if isinstance(data, (bytes, bytearray)) else data.encode('utf-8', 'replace') + self.acked = False + self.nacked = False + def ack(self): self.acked = True + def nack(self): self.nacked = True + +class NyxPubsubLoopback: + def __init__(self): + self._subs = {} + self._mid = 0 + def subscribe(self, topic, cb): + self._subs.setdefault(topic, []).append(cb) + def publish(self, topic, payload): + self._mid += 1 + msg = NyxPubsubMessage(f'nyx-{self._mid:08d}', payload) + for cb in self._subs.get(topic, []): + cb(msg) +"# + } + Lang::Go => { + r#" +type NyxPubsubMessage struct { + ID string + Data []byte + Acked bool +} + +func (m *NyxPubsubMessage) Ack() { m.Acked = true } +func (m *NyxPubsubMessage) Nack() { m.Acked = false } + +type NyxPubsubLoopback struct { + subs map[string][]func(*NyxPubsubMessage) + mid int +} + +func NewNyxPubsubLoopback() *NyxPubsubLoopback { + return &NyxPubsubLoopback{subs: map[string][]func(*NyxPubsubMessage){}} +} + +func (l *NyxPubsubLoopback) Subscribe(topic string, cb func(*NyxPubsubMessage)) { + l.subs[topic] = append(l.subs[topic], cb) +} + +func (l *NyxPubsubLoopback) Publish(topic string, payload string) { + l.mid += 1 + msg := &NyxPubsubMessage{ID: fmt.Sprintf("nyx-%08d", l.mid), Data: []byte(payload)} + for _, cb := range l.subs[topic] { + cb(msg) + } +} +"# + } + _ => "", + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn marker_stable() { + assert_eq!(PUBSUB_PUBLISH_MARKER, "__NYX_BROKER_PUBLISH__:pubsub"); + } + + #[test] + fn python_carries_ack_nack_surface() { + let src = pubsub_source(Lang::Python); + assert!(src.contains("class NyxPubsubMessage")); + assert!(src.contains("def ack")); + assert!(src.contains("def nack")); + assert!(src.contains("message_id")); + } + + #[test] + fn go_carries_ack_nack_methods() { + let src = pubsub_source(Lang::Go); + assert!(src.contains("type NyxPubsubMessage struct")); + assert!(src.contains("func (m *NyxPubsubMessage) Ack")); + assert!(src.contains("NewNyxPubsubLoopback")); + } +} diff --git a/src/dynamic/stubs/broker_rabbit.rs b/src/dynamic/stubs/broker_rabbit.rs new file mode 100644 index 00000000..ba4963dc --- /dev/null +++ b/src/dynamic/stubs/broker_rabbit.rs @@ -0,0 +1,88 @@ +//! Phase 20 (Track M.2) — RabbitMQ broker loopback stub. +//! +//! Mints `pika.BasicProperties` / `com.rabbitmq.client.Envelope`-shaped +//! envelopes for Python / Java handlers. + +use crate::symbol::Lang; + +/// Stdout sentinel printed once per publish. +pub const RABBIT_PUBLISH_MARKER: &str = "__NYX_BROKER_PUBLISH__:rabbit"; + +/// Source snippet declaring an in-process RabbitMQ loopback for `lang`. +pub fn rabbit_source(lang: Lang) -> &'static str { + match lang { + Lang::Python => { + r#" +class NyxRabbitProperties: + def __init__(self, mid): + self.message_id = mid + self.delivery_mode = 2 + +class NyxRabbitMethod: + def __init__(self, tag, routing_key): + self.delivery_tag = tag + self.routing_key = routing_key + +class NyxRabbitChannel: + def __init__(self): + self._subs = {} + self._tag = 0 + def basic_consume(self, queue, on_message_callback, **kw): + self._subs.setdefault(queue, []).append(on_message_callback) + def basic_publish(self, exchange, routing_key, body, properties=None): + self._tag += 1 + method = NyxRabbitMethod(self._tag, routing_key) + props = properties or NyxRabbitProperties(f'nyx-{self._tag:08d}') + body_bytes = body if isinstance(body, (bytes, bytearray)) else body.encode('utf-8', 'replace') + for cb in self._subs.get(routing_key, []): + cb(self, method, props, body_bytes) +"# + } + Lang::Java => { + r#" + static class NyxRabbitChannel { + private final java.util.Map>> subs = new java.util.HashMap<>(); + private long tag = 0; + public void basicConsume(String queue, java.util.function.BiConsumer cb) { + subs.computeIfAbsent(queue, k -> new java.util.ArrayList<>()).add(cb); + } + public void basicPublish(String exchange, String routingKey, String body) { + tag += 1; + String mid = "nyx-" + tag; + for (java.util.function.BiConsumer cb : subs.getOrDefault(routingKey, java.util.Collections.emptyList())) { + cb.accept(mid, body); + } + } + } +"# + } + _ => "", + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn marker_stable() { + assert_eq!(RABBIT_PUBLISH_MARKER, "__NYX_BROKER_PUBLISH__:rabbit"); + } + + #[test] + fn python_carries_pika_shape() { + let src = rabbit_source(Lang::Python); + assert!(src.contains("class NyxRabbitChannel")); + assert!(src.contains("basic_consume")); + assert!(src.contains("basic_publish")); + assert!(src.contains("delivery_tag")); + } + + #[test] + fn java_carries_static_inner_channel() { + let src = rabbit_source(Lang::Java); + assert!(src.contains("static class NyxRabbitChannel")); + assert!(src.contains("basicConsume")); + assert!(src.contains("basicPublish")); + } +} diff --git a/src/dynamic/stubs/broker_sqs.rs b/src/dynamic/stubs/broker_sqs.rs new file mode 100644 index 00000000..4d19ae2b --- /dev/null +++ b/src/dynamic/stubs/broker_sqs.rs @@ -0,0 +1,119 @@ +//! Phase 20 (Track M.2) — SQS broker loopback stub source-snippet provider. +//! +//! Mirrors [`crate::dynamic::stubs::broker_kafka`] but mints SQS-shaped +//! envelopes (`MessageId`, `ReceiptHandle`, `Body`) the way `boto3.sqs` / +//! `software.amazon.awssdk.services.sqs` / the AWS Node SDK present +//! them. The loopback never speaks the AWS protocol — it just calls +//! the registered handler synchronously with a single-message envelope. + +use crate::symbol::Lang; + +/// Stdout sentinel the per-lang harness prints once per publish. +pub const SQS_PUBLISH_MARKER: &str = "__NYX_BROKER_PUBLISH__:sqs"; + +/// Source snippet declaring an in-process SQS loopback for `lang`. +/// Java / Python / Node (JS+TS) carry concrete snippets; every other +/// lang returns `""`. +pub fn sqs_source(lang: Lang) -> &'static str { + match lang { + Lang::Python => { + r#" +class NyxSqsLoopback: + """In-process SQS loopback — boto3-shaped envelopes.""" + def __init__(self): + self._subs = {} + self._mid = 0 + def subscribe(self, queue, cb): + self._subs.setdefault(queue, []).append(cb) + def publish(self, queue, payload): + self._mid += 1 + envelope = { + 'MessageId': f'nyx-{self._mid:08d}', + 'ReceiptHandle': f'rh-nyx-{self._mid:08d}', + 'Body': payload, + } + for cb in self._subs.get(queue, []): + cb(envelope) +"# + } + Lang::Java => { + r#" + static class NyxSqsLoopback { + private final java.util.Map>>> subs = new java.util.HashMap<>(); + private int mid = 0; + public void subscribe(String queue, java.util.function.Consumer> cb) { + subs.computeIfAbsent(queue, k -> new java.util.ArrayList<>()).add(cb); + } + public void publish(String queue, String payload) { + mid += 1; + java.util.Map envelope = new java.util.HashMap<>(); + envelope.put("MessageId", "nyx-" + mid); + envelope.put("ReceiptHandle", "rh-nyx-" + mid); + envelope.put("Body", payload); + for (java.util.function.Consumer> cb : subs.getOrDefault(queue, java.util.Collections.emptyList())) { + cb.accept(envelope); + } + } + } +"# + } + Lang::JavaScript | Lang::TypeScript => { + r#" +class NyxSqsLoopback { + constructor() { this._subs = new Map(); this._mid = 0; } + subscribe(queue, cb) { + if (!this._subs.has(queue)) this._subs.set(queue, []); + this._subs.get(queue).push(cb); + } + publish(queue, payload) { + this._mid += 1; + const envelope = { + MessageId: 'nyx-' + this._mid, + ReceiptHandle: 'rh-nyx-' + this._mid, + Body: payload, + }; + for (const cb of (this._subs.get(queue) || [])) cb(envelope); + } +} +"# + } + _ => "", + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn marker_stable() { + assert_eq!(SQS_PUBLISH_MARKER, "__NYX_BROKER_PUBLISH__:sqs"); + } + + #[test] + fn python_carries_boto3_shape() { + let src = sqs_source(Lang::Python); + assert!(src.contains("class NyxSqsLoopback")); + assert!(src.contains("MessageId")); + assert!(src.contains("ReceiptHandle")); + assert!(src.contains("Body")); + } + + #[test] + fn java_carries_envelope_map() { + let src = sqs_source(Lang::Java); + assert!(src.contains("static class NyxSqsLoopback")); + assert!(src.contains("MessageId")); + assert!(src.contains("Body")); + } + + #[test] + fn node_class_supports_subscribe_publish() { + let src = sqs_source(Lang::JavaScript); + assert!(src.contains("class NyxSqsLoopback")); + assert!(src.contains("subscribe(queue")); + assert!(src.contains("publish(queue")); + let ts = sqs_source(Lang::TypeScript); + assert_eq!(ts, src); + } +} diff --git a/src/dynamic/stubs/mod.rs b/src/dynamic/stubs/mod.rs index 1d28007d..74d5d71c 100644 --- a/src/dynamic/stubs/mod.rs +++ b/src/dynamic/stubs/mod.rs @@ -51,6 +51,11 @@ //! [`crate::dynamic::oracle::oracle_fired_with_stubs`] so the //! `StubEventMatches` predicate can satisfy a payload. +pub mod broker_kafka; +pub mod broker_nats; +pub mod broker_pubsub; +pub mod broker_rabbit; +pub mod broker_sqs; pub mod filesystem; pub mod http; pub mod ldap_server; @@ -59,6 +64,11 @@ pub mod redis; pub mod sql; pub mod xpath_document; +pub use broker_kafka::{kafka_source, KAFKA_PUBLISH_MARKER}; +pub use broker_nats::{nats_source, NATS_PUBLISH_MARKER}; +pub use broker_pubsub::{pubsub_source, PUBSUB_PUBLISH_MARKER}; +pub use broker_rabbit::{rabbit_source, RABBIT_PUBLISH_MARKER}; +pub use broker_sqs::{sqs_source, SQS_PUBLISH_MARKER}; pub use filesystem::FilesystemStub; pub use http::HttpStub; pub use ldap_server::LdapStub; diff --git a/tests/dynamic_fixtures/message_handler/kafka_java/Benign.java b/tests/dynamic_fixtures/message_handler/kafka_java/Benign.java new file mode 100644 index 00000000..07470173 --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/kafka_java/Benign.java @@ -0,0 +1,9 @@ +// Phase 20 (Track M.2) — Kafka Java benign control. +// `org.springframework.kafka` adapter marker preserved. +public class Benign { + public Benign() {} + + public void onMessage(String body) throws Exception { + new ProcessBuilder("echo", body).inheritIO().start().waitFor(); + } +} diff --git a/tests/dynamic_fixtures/message_handler/kafka_java/Vuln.java b/tests/dynamic_fixtures/message_handler/kafka_java/Vuln.java new file mode 100644 index 00000000..70bd7e78 --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/kafka_java/Vuln.java @@ -0,0 +1,15 @@ +// Phase 20 (Track M.2) — Kafka Java vuln fixture. +// +// Marker line so the kafka-java framework adapter binds: +// `org.springframework.kafka` consumer entry point. Annotation is +// elided so javac compiles without the Spring jar; the dynamic harness +// invokes onMessage reflectively. + +public class Vuln { + public Vuln() {} + + public void onMessage(String body) throws Exception { + // SINK: tainted body concatenated into shell command + new ProcessBuilder("sh", "-c", "echo " + body).inheritIO().start().waitFor(); + } +} diff --git a/tests/dynamic_fixtures/message_handler/kafka_python/benign.py b/tests/dynamic_fixtures/message_handler/kafka_python/benign.py new file mode 100644 index 00000000..336e5dea --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/kafka_python/benign.py @@ -0,0 +1,9 @@ +"""Phase 20 (Track M.2) — Kafka Python benign control.""" +import os +import shlex + +_NYX_ADAPTER_MARKER = "from kafka import KafkaConsumer" + + +def handler(message): + os.system("echo " + shlex.quote(str(message))) diff --git a/tests/dynamic_fixtures/message_handler/kafka_python/vuln.py b/tests/dynamic_fixtures/message_handler/kafka_python/vuln.py new file mode 100644 index 00000000..4a803da2 --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/kafka_python/vuln.py @@ -0,0 +1,25 @@ +"""Phase 20 (Track M.2) — Kafka Python vuln fixture. + +`handler` is a Kafka consumer callback (modelled after +`KafkaConsumer('orders').poll()` dispatch) that splices the raw +message body into a shell command via `os.system`. A malicious +producer can inject command-separator metacharacters into the body +and the shell will execute them — the classic message-handler cmdi +shape. + +Adapter source-marker: `from kafka import KafkaConsumer` is kept as a +docstring reference (not a top-level import) so the harness can run +without the real `kafka-python` library installed on the host. +""" +import os + +# Phase 20 framework adapter detects this fixture via the `from kafka` +# / `import kafka` substring scan. Keeping the marker in source lets +# the adapter bind without forcing the host to pin the kafka-python +# pip dep just to load the fixture module. +_NYX_ADAPTER_MARKER = "from kafka import KafkaConsumer" + + +def handler(message): + # SINK: tainted message body concatenated into shell command + os.system("echo " + str(message)) diff --git a/tests/dynamic_fixtures/message_handler/nats_go/benign.go b/tests/dynamic_fixtures/message_handler/nats_go/benign.go new file mode 100644 index 00000000..a7e49c3d --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/nats_go/benign.go @@ -0,0 +1,19 @@ +// Phase 20 (Track M.2) — NATS Go benign control. +package entry + +import ( + "os" + "os/exec" +) + +const _adapterMarker = "github.com/nats-io/nats.go" + +func OnMessage(payload string) { + cmd := exec.Command("echo", payload) + out, _ := cmd.Output() + os.Stdout.Write(out) +} + +var NyxHandlers = map[string]interface{}{ + "OnMessage": OnMessage, +} diff --git a/tests/dynamic_fixtures/message_handler/nats_go/vuln.go b/tests/dynamic_fixtures/message_handler/nats_go/vuln.go new file mode 100644 index 00000000..9287ac58 --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/nats_go/vuln.go @@ -0,0 +1,22 @@ +// Phase 20 (Track M.2) — NATS Go vuln fixture. +// +// Adapter source-marker: github.com/nats-io/nats.go (string-literal only). +package entry + +import ( + "os" + "os/exec" +) + +const _adapterMarker = "github.com/nats-io/nats.go" + +func OnMessage(payload string) { + // SINK: tainted payload concatenated into shell command + cmd := exec.Command("sh", "-c", "echo "+payload) + out, _ := cmd.Output() + os.Stdout.Write(out) +} + +var NyxHandlers = map[string]interface{}{ + "OnMessage": OnMessage, +} diff --git a/tests/dynamic_fixtures/message_handler/pubsub_go/benign.go b/tests/dynamic_fixtures/message_handler/pubsub_go/benign.go new file mode 100644 index 00000000..41470565 --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/pubsub_go/benign.go @@ -0,0 +1,19 @@ +// Phase 20 (Track M.2) — Google Pub/Sub Go benign control. +package entry + +import ( + "os" + "os/exec" +) + +const _adapterMarker = "cloud.google.com/go/pubsub" + +func OnMessage(payload string) { + cmd := exec.Command("echo", payload) + out, _ := cmd.Output() + os.Stdout.Write(out) +} + +var NyxHandlers = map[string]interface{}{ + "OnMessage": OnMessage, +} diff --git a/tests/dynamic_fixtures/message_handler/pubsub_go/vuln.go b/tests/dynamic_fixtures/message_handler/pubsub_go/vuln.go new file mode 100644 index 00000000..08dc3159 --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/pubsub_go/vuln.go @@ -0,0 +1,24 @@ +// Phase 20 (Track M.2) — Google Pub/Sub Go vuln fixture. +// +// Adapter source-marker: cloud.google.com/go/pubsub (string-literal only). +// The handler signature accepts a string so the Phase 20 harness +// dispatch falls through to the NYX_PAYLOAD env var. +package entry + +import ( + "os" + "os/exec" +) + +const _adapterMarker = "cloud.google.com/go/pubsub" + +func OnMessage(payload string) { + // SINK: tainted payload concatenated into shell command + cmd := exec.Command("sh", "-c", "echo "+payload) + out, _ := cmd.Output() + os.Stdout.Write(out) +} + +var NyxHandlers = map[string]interface{}{ + "OnMessage": OnMessage, +} diff --git a/tests/dynamic_fixtures/message_handler/pubsub_python/benign.py b/tests/dynamic_fixtures/message_handler/pubsub_python/benign.py new file mode 100644 index 00000000..f9adb39a --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/pubsub_python/benign.py @@ -0,0 +1,21 @@ +"""Phase 20 (Track M.2) — Google Pub/Sub Python benign control.""" +import os +import shlex + +_NYX_ADAPTER_MARKER = "from google.cloud import pubsub_v1" +_NYX_TOPIC_MARKER = '.subscribe("projects/p/subscriptions/s"' + + +def callback(message): + body = getattr(message, 'data', None) + if body is None and isinstance(message, dict): + body = message.get('data') + if isinstance(body, (bytes, bytearray)): + body = body.decode('utf-8', 'replace') + if body is None: + body = str(message) + os.system("echo " + shlex.quote(body)) + try: + message.ack() + except Exception: + pass diff --git a/tests/dynamic_fixtures/message_handler/pubsub_python/vuln.py b/tests/dynamic_fixtures/message_handler/pubsub_python/vuln.py new file mode 100644 index 00000000..dcdc12a7 --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/pubsub_python/vuln.py @@ -0,0 +1,28 @@ +"""Phase 20 (Track M.2) — Google Pub/Sub Python vuln fixture. + +`callback` is a `pubsub_v1.SubscriberClient.subscribe` callback that +takes `message.data` bytes straight into a shell command. + +Adapter marker kept as a string literal so the google-cloud-pubsub dep +is not required to load the module. +""" +import os + +_NYX_ADAPTER_MARKER = "from google.cloud import pubsub_v1" +_NYX_TOPIC_MARKER = '.subscribe("projects/p/subscriptions/s"' + + +def callback(message): + body = getattr(message, 'data', None) + if body is None and isinstance(message, dict): + body = message.get('data') + if isinstance(body, (bytes, bytearray)): + body = body.decode('utf-8', 'replace') + if body is None: + body = str(message) + # SINK: tainted message body concatenated into shell command + os.system("echo " + body) + try: + message.ack() + except Exception: + pass diff --git a/tests/dynamic_fixtures/message_handler/rabbit_java/Benign.java b/tests/dynamic_fixtures/message_handler/rabbit_java/Benign.java new file mode 100644 index 00000000..e53f618d --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/rabbit_java/Benign.java @@ -0,0 +1,10 @@ +// Phase 20 (Track M.2) — RabbitMQ Java benign control. +// `org.springframework.amqp.rabbit` adapter marker preserved. + +public class Benign { + public Benign() {} + + public void onMessage(String messageId, String body) throws Exception { + new ProcessBuilder("echo", body).inheritIO().start().waitFor(); + } +} diff --git a/tests/dynamic_fixtures/message_handler/rabbit_java/Vuln.java b/tests/dynamic_fixtures/message_handler/rabbit_java/Vuln.java new file mode 100644 index 00000000..0142fd4e --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/rabbit_java/Vuln.java @@ -0,0 +1,12 @@ +// Phase 20 (Track M.2) — RabbitMQ Java vuln fixture. +// `org.springframework.amqp.rabbit` consumer marker preserved; +// annotation elided so javac compiles without the Spring AMQP jar. + +public class Vuln { + public Vuln() {} + + public void onMessage(String messageId, String body) throws Exception { + // SINK: tainted body concatenated into shell command + new ProcessBuilder("sh", "-c", "echo " + body).inheritIO().start().waitFor(); + } +} diff --git a/tests/dynamic_fixtures/message_handler/rabbit_python/benign.py b/tests/dynamic_fixtures/message_handler/rabbit_python/benign.py new file mode 100644 index 00000000..1de69d9c --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/rabbit_python/benign.py @@ -0,0 +1,12 @@ +"""Phase 20 (Track M.2) — RabbitMQ Python benign control.""" +import os +import shlex + +_NYX_ADAPTER_MARKER = "import pika" +_NYX_QUEUE_MARKER = 'queue="work"' + + +def on_message(ch, method, properties, body): + if isinstance(body, (bytes, bytearray)): + body = body.decode('utf-8', 'replace') + os.system("echo " + shlex.quote(body)) diff --git a/tests/dynamic_fixtures/message_handler/rabbit_python/vuln.py b/tests/dynamic_fixtures/message_handler/rabbit_python/vuln.py new file mode 100644 index 00000000..0b008026 --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/rabbit_python/vuln.py @@ -0,0 +1,19 @@ +"""Phase 20 (Track M.2) — RabbitMQ Python vuln fixture. + +`on_message` is a `pika.BlockingConnection.channel.basic_consume` +callback whose body argument flows into a shell command. + +Adapter marker kept as a string literal so the pika dep is not +required to load the module. +""" +import os + +_NYX_ADAPTER_MARKER = "import pika" +_NYX_QUEUE_MARKER = 'queue="work"' + + +def on_message(ch, method, properties, body): + if isinstance(body, (bytes, bytearray)): + body = body.decode('utf-8', 'replace') + # SINK: tainted body concatenated into shell command + os.system("echo " + body) diff --git a/tests/dynamic_fixtures/message_handler/sqs_java/Benign.java b/tests/dynamic_fixtures/message_handler/sqs_java/Benign.java new file mode 100644 index 00000000..b0108f7c --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/sqs_java/Benign.java @@ -0,0 +1,11 @@ +// Phase 20 (Track M.2) — SQS Java benign control. +// `io.awspring.cloud.sqs` adapter marker preserved. + +public class Benign { + public Benign() {} + + public void handleMessage(java.util.Map env) throws Exception { + String body = env != null ? env.getOrDefault("Body", "") : ""; + new ProcessBuilder("echo", body).inheritIO().start().waitFor(); + } +} diff --git a/tests/dynamic_fixtures/message_handler/sqs_java/Vuln.java b/tests/dynamic_fixtures/message_handler/sqs_java/Vuln.java new file mode 100644 index 00000000..211e494a --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/sqs_java/Vuln.java @@ -0,0 +1,13 @@ +// Phase 20 (Track M.2) — SQS Java vuln fixture. +// `io.awspring.cloud.sqs` consumer entry point — annotation elided so +// javac compiles without the Spring Cloud AWS jar. + +public class Vuln { + public Vuln() {} + + public void handleMessage(java.util.Map env) throws Exception { + String body = env != null ? env.getOrDefault("Body", "") : ""; + // SINK: tainted Body concatenated into shell command + new ProcessBuilder("sh", "-c", "echo " + body).inheritIO().start().waitFor(); + } +} diff --git a/tests/dynamic_fixtures/message_handler/sqs_node/benign.js b/tests/dynamic_fixtures/message_handler/sqs_node/benign.js new file mode 100644 index 00000000..14095b12 --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/sqs_node/benign.js @@ -0,0 +1,16 @@ +// Phase 20 (Track M.2) — SQS Node benign control. +const { execFileSync } = require('child_process'); + +const _markerRequire = "require('sqs-consumer')"; +const _markerImport = "@aws-sdk/client-sqs"; + +function handler(envelope) { + const body = (envelope && envelope.Body) ? envelope.Body : ''; + try { + const out = execFileSync('echo', [body]).toString(); + process.stdout.write(out); + } catch (_e) { + } +} + +module.exports = { handler }; diff --git a/tests/dynamic_fixtures/message_handler/sqs_node/vuln.js b/tests/dynamic_fixtures/message_handler/sqs_node/vuln.js new file mode 100644 index 00000000..f2cc222e --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/sqs_node/vuln.js @@ -0,0 +1,22 @@ +// Phase 20 (Track M.2) — SQS Node vuln fixture. +// `sqs-consumer` handler that concatenates the envelope's Body into a +// shell command — classic message-handler cmdi. +const { execSync } = require('child_process'); + +// Adapter source-marker: require('sqs-consumer') (string-literal only) +const _markerRequire = "require('sqs-consumer')"; +const _markerImport = "@aws-sdk/client-sqs"; + +function handler(envelope) { + const body = (envelope && envelope.Body) ? envelope.Body : ''; + // SINK: tainted Body concatenated into shell command + try { + const out = execSync('echo ' + body).toString(); + process.stdout.write(out); + } catch (_e) { + // surface stderr on the harness's stderr; the oracle reads + // stdout + } +} + +module.exports = { handler }; diff --git a/tests/dynamic_fixtures/message_handler/sqs_python/benign.py b/tests/dynamic_fixtures/message_handler/sqs_python/benign.py new file mode 100644 index 00000000..945e7ba8 --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/sqs_python/benign.py @@ -0,0 +1,10 @@ +"""Phase 20 (Track M.2) — SQS Python benign control.""" +import os +import shlex + +_NYX_ADAPTER_MARKER = "boto3.client('sqs')" + + +def handler(envelope): + body = envelope.get('Body', '') if isinstance(envelope, dict) else str(envelope) + os.system("echo " + shlex.quote(body)) diff --git a/tests/dynamic_fixtures/message_handler/sqs_python/vuln.py b/tests/dynamic_fixtures/message_handler/sqs_python/vuln.py new file mode 100644 index 00000000..36992858 --- /dev/null +++ b/tests/dynamic_fixtures/message_handler/sqs_python/vuln.py @@ -0,0 +1,17 @@ +"""Phase 20 (Track M.2) — SQS Python vuln fixture. + +`handler` is a boto3 SQS poller callback that takes the raw envelope's +`Body` field straight into a shell command. + +Adapter marker kept as a string literal so the boto3 dep is not +required to load the module. +""" +import os + +_NYX_ADAPTER_MARKER = "boto3.client('sqs')" + + +def handler(envelope): + body = envelope.get('Body', '') if isinstance(envelope, dict) else str(envelope) + # SINK: tainted Body concatenated into shell command + os.system("echo " + body) diff --git a/tests/message_handler_corpus.rs b/tests/message_handler_corpus.rs new file mode 100644 index 00000000..ff9f678c --- /dev/null +++ b/tests/message_handler_corpus.rs @@ -0,0 +1,555 @@ +//! Phase 20 (Track M.2) — `MessageHandler` end-to-end acceptance. +//! +//! Asserts the new `EntryKind::MessageHandler { queue, message_schema }` +//! variant is supported by the per-language emitters the brief targets +//! (Python, Java, JavaScript, TypeScript, Go) so the +//! `Inconclusive(EntryKindUnsupported { attempted: MessageHandler })` +//! rate drops to 0% across those five languages. Also exercises the +//! 10 Phase 20 framework adapters (`kafka-python`, `kafka-java`, +//! `sqs-python`, `sqs-java`, `sqs-node`, `pubsub-python`, `pubsub-go`, +//! `rabbit-python`, `rabbit-java`, `nats-go`) against the fixtures +//! under `tests/dynamic_fixtures/message_handler/`. +//! +//! `cargo nextest run --features dynamic --test message_handler_corpus`. + +#![cfg(feature = "dynamic")] + +mod common; + +use nyx_scanner::dynamic::framework::registry::adapters_for; +use nyx_scanner::dynamic::framework::{detect_binding, FrameworkBinding}; +use nyx_scanner::dynamic::lang; +use nyx_scanner::dynamic::spec::{EntryKind, EntryKindTag, HarnessSpec, PayloadSlot}; +use nyx_scanner::labels::Cap; +use nyx_scanner::summary::FuncSummary; +use nyx_scanner::symbol::Lang; + +const SUPPORTED_LANGS: &[Lang] = &[ + Lang::Python, + Lang::Java, + Lang::JavaScript, + Lang::TypeScript, + Lang::Go, +]; + +const UNSUPPORTED_LANGS: &[Lang] = &[ + Lang::Php, + Lang::Ruby, + Lang::Rust, + Lang::C, + Lang::Cpp, +]; + +fn entry_file(broker_lang: &str) -> &'static str { + // Phase 20 fixtures live at tests/dynamic_fixtures/message_handler/{broker_lang}/{vuln,benign}. + match broker_lang { + "kafka_python" => "tests/dynamic_fixtures/message_handler/kafka_python/vuln.py", + "kafka_java" => "tests/dynamic_fixtures/message_handler/kafka_java/Vuln.java", + "sqs_python" => "tests/dynamic_fixtures/message_handler/sqs_python/vuln.py", + "sqs_java" => "tests/dynamic_fixtures/message_handler/sqs_java/Vuln.java", + "sqs_node" => "tests/dynamic_fixtures/message_handler/sqs_node/vuln.js", + "pubsub_python" => "tests/dynamic_fixtures/message_handler/pubsub_python/vuln.py", + "pubsub_go" => "tests/dynamic_fixtures/message_handler/pubsub_go/vuln.go", + "rabbit_python" => "tests/dynamic_fixtures/message_handler/rabbit_python/vuln.py", + "rabbit_java" => "tests/dynamic_fixtures/message_handler/rabbit_java/Vuln.java", + "nats_go" => "tests/dynamic_fixtures/message_handler/nats_go/vuln.go", + other => panic!("unknown broker_lang fixture {other}"), + } +} + +fn make_spec(lang: Lang, queue: &str, handler: &str, fixture: &str) -> HarnessSpec { + HarnessSpec { + finding_id: "phase20msghandler".into(), + entry_file: fixture.into(), + entry_name: handler.into(), + entry_kind: EntryKind::MessageHandler { + queue: queue.into(), + message_schema: None, + }, + lang, + toolchain_id: "phase20".into(), + payload_slot: PayloadSlot::Param(0), + expected_cap: Cap::CODE_EXEC, + constraint_hints: vec![], + sink_file: fixture.into(), + sink_line: 1, + spec_hash: "phase20msghandler".into(), + derivation: nyx_scanner::dynamic::spec::SpecDerivationStrategy::FromFlowSteps, + stubs_required: vec![], + framework: None, + java_toolchain: nyx_scanner::dynamic::spec::JavaToolchain::default(), + } +} + +// ── Supported-set assertions ────────────────────────────────────────────────── + +#[test] +fn message_handler_supported_by_phase_20_lang_emitters() { + for lang in SUPPORTED_LANGS { + let supported = lang::entry_kinds_supported(*lang); + assert!( + supported.contains(&EntryKindTag::MessageHandler), + "{lang:?} must advertise MessageHandler after Phase 20; supported = {supported:?}", + ); + } +} + +#[test] +fn message_handler_not_supported_outside_phase_20_langs() { + for lang in UNSUPPORTED_LANGS { + let supported = lang::entry_kinds_supported(*lang); + assert!( + !supported.contains(&EntryKindTag::MessageHandler), + "{lang:?} must not yet advertise MessageHandler — Phase 20 only covers 5 langs; got {supported:?}", + ); + } +} + +#[test] +fn message_handler_emit_does_not_short_circuit_for_supported_langs() { + let cases: &[(Lang, &str, &str, &str)] = &[ + (Lang::Python, "kafka_python", "orders", "handler"), + (Lang::Java, "kafka_java", "orders", "onMessage"), + (Lang::JavaScript, "sqs_node", "jobs", "handler"), + (Lang::TypeScript, "sqs_node", "jobs", "handler"), + (Lang::Go, "pubsub_go", "my-sub", "OnMessage"), + ]; + for (lang, broker_lang, queue, handler) in cases { + let spec = make_spec(*lang, queue, handler, entry_file(broker_lang)); + let result = lang::emit(&spec); + assert!( + result.is_ok(), + "{lang:?} emit returned {result:?} for MessageHandler spec", + ); + } +} + +#[test] +fn message_handler_harness_carries_queue_and_handler_literals() { + let cases: &[(Lang, &str, &str, &str)] = &[ + (Lang::Python, "kafka_python", "orders", "handler"), + (Lang::Java, "kafka_java", "orders", "onMessage"), + (Lang::JavaScript, "sqs_node", "jobs", "handler"), + (Lang::Go, "pubsub_go", "my-sub", "OnMessage"), + ]; + for (lang, broker_lang, queue, handler) in cases { + let spec = make_spec(*lang, queue, handler, entry_file(broker_lang)); + let h = lang::emit(&spec).expect("emit ok"); + assert!( + h.source.contains(queue), + "{lang:?} harness must reference queue {queue:?}; source: {}", + h.source + ); + assert!( + h.source.contains(handler), + "{lang:?} harness must reference handler {handler:?}", + ); + } +} + +#[test] +fn message_handler_python_dispatch_subscribes_to_loopback() { + let spec = make_spec( + Lang::Python, + "orders", + "handler", + entry_file("kafka_python"), + ); + let h = lang::emit(&spec).expect("emit ok"); + assert!(h.source.contains("NyxKafkaLoopback")); + assert!(h.source.contains("subscribe")); + assert!(h.source.contains("__NYX_BROKER_PUBLISH__")); + assert!(h.source.contains("payload")); +} + +#[test] +fn message_handler_java_emits_reflective_dispatch() { + let spec = make_spec(Lang::Java, "orders", "onMessage", entry_file("kafka_java")); + let h = lang::emit(&spec).expect("emit ok"); + assert!(h.source.contains("NyxKafkaLoopback")); + assert!(h.source.contains("Class.forName")); + assert!(h.source.contains("getDeclaredMethod")); +} + +#[test] +fn message_handler_node_uses_sqs_loopback() { + let spec = make_spec(Lang::JavaScript, "jobs", "handler", entry_file("sqs_node")); + let h = lang::emit(&spec).expect("emit ok"); + assert!(h.source.contains("NyxSqsLoopback")); + assert!(h.source.contains("subscribe")); + assert!(h.source.contains("__NYX_BROKER_PUBLISH__:sqs")); +} + +#[test] +fn message_handler_go_uses_nyx_handlers_registry() { + let spec = make_spec(Lang::Go, "my-sub", "OnMessage", entry_file("pubsub_go")); + let h = lang::emit(&spec).expect("emit ok"); + assert!(h.source.contains("entry.NyxHandlers")); + assert!(h.source.contains("NewNyxPubsubLoopback")); +} + +// ── Framework-adapter assertions ────────────────────────────────────────────── + +fn ts_language_for(lang: Lang) -> tree_sitter::Language { + match lang { + Lang::Java => tree_sitter::Language::from(tree_sitter_java::LANGUAGE), + Lang::Python => tree_sitter::Language::from(tree_sitter_python::LANGUAGE), + Lang::JavaScript => tree_sitter::Language::from(tree_sitter_javascript::LANGUAGE), + Lang::Go => tree_sitter::Language::from(tree_sitter_go::LANGUAGE), + other => panic!("unsupported test lang {other:?}"), + } +} + +fn detect_for(lang: Lang, fixture: &str, handler: &str) -> Option { + let bytes = std::fs::read(fixture).expect("fixture exists"); + let ts_lang = ts_language_for(lang); + let mut parser = tree_sitter::Parser::new(); + parser.set_language(&ts_lang).unwrap(); + let tree = parser.parse(&bytes, None).unwrap(); + let summary = FuncSummary { + name: handler.into(), + ..Default::default() + }; + detect_binding(&summary, tree.root_node(), &bytes, lang) +} + +#[test] +fn kafka_python_adapter_binds_message_handler_kind() { + let b = detect_for(Lang::Python, entry_file("kafka_python"), "handler") + .expect("kafka-python detect"); + assert!(matches!(b.kind, EntryKind::MessageHandler { .. })); +} + +#[test] +fn kafka_java_adapter_binds_message_handler_kind() { + let b = detect_for(Lang::Java, entry_file("kafka_java"), "onMessage") + .expect("kafka-java detect"); + assert!(matches!(b.kind, EntryKind::MessageHandler { .. })); +} + +#[test] +fn sqs_python_adapter_binds_message_handler_kind() { + let b = detect_for(Lang::Python, entry_file("sqs_python"), "handler") + .expect("sqs-python detect"); + assert!(matches!(b.kind, EntryKind::MessageHandler { .. })); +} + +#[test] +fn sqs_java_adapter_binds_message_handler_kind() { + let b = detect_for(Lang::Java, entry_file("sqs_java"), "handleMessage") + .expect("sqs-java detect"); + assert!(matches!(b.kind, EntryKind::MessageHandler { .. })); +} + +#[test] +fn sqs_node_adapter_binds_message_handler_kind() { + let b = detect_for(Lang::JavaScript, entry_file("sqs_node"), "handler") + .expect("sqs-node detect"); + assert!(matches!(b.kind, EntryKind::MessageHandler { .. })); +} + +#[test] +fn pubsub_python_adapter_binds_message_handler_kind() { + let b = detect_for(Lang::Python, entry_file("pubsub_python"), "callback") + .expect("pubsub-python detect"); + assert!(matches!(b.kind, EntryKind::MessageHandler { .. })); +} + +#[test] +fn pubsub_go_adapter_binds_message_handler_kind() { + let b = detect_for(Lang::Go, entry_file("pubsub_go"), "OnMessage") + .expect("pubsub-go detect"); + assert!(matches!(b.kind, EntryKind::MessageHandler { .. })); +} + +#[test] +fn rabbit_python_adapter_binds_message_handler_kind() { + let b = detect_for(Lang::Python, entry_file("rabbit_python"), "on_message") + .expect("rabbit-python detect"); + assert!(matches!(b.kind, EntryKind::MessageHandler { .. })); +} + +#[test] +fn rabbit_java_adapter_binds_message_handler_kind() { + let b = detect_for(Lang::Java, entry_file("rabbit_java"), "onMessage") + .expect("rabbit-java detect"); + assert!(matches!(b.kind, EntryKind::MessageHandler { .. })); +} + +#[test] +fn nats_go_adapter_binds_message_handler_kind() { + let b = detect_for(Lang::Go, entry_file("nats_go"), "OnMessage") + .expect("nats-go detect"); + assert!(matches!(b.kind, EntryKind::MessageHandler { .. })); +} + +#[test] +fn registry_slices_include_phase_20_adapters() { + let java_names: Vec<&'static str> = adapters_for(Lang::Java) + .iter() + .map(|a| a.name()) + .collect(); + assert!(java_names.contains(&"kafka-java")); + assert!(java_names.contains(&"sqs-java")); + assert!(java_names.contains(&"rabbit-java")); + + let python_names: Vec<&'static str> = adapters_for(Lang::Python) + .iter() + .map(|a| a.name()) + .collect(); + assert!(python_names.contains(&"kafka-python")); + assert!(python_names.contains(&"sqs-python")); + assert!(python_names.contains(&"pubsub-python")); + assert!(python_names.contains(&"rabbit-python")); + + let go_names: Vec<&'static str> = adapters_for(Lang::Go) + .iter() + .map(|a| a.name()) + .collect(); + assert!(go_names.contains(&"pubsub-go")); + assert!(go_names.contains(&"nats-go")); + + let js_names: Vec<&'static str> = adapters_for(Lang::JavaScript) + .iter() + .map(|a| a.name()) + .collect(); + assert!(js_names.contains(&"sqs-node")); +} + +// ── End-to-end Phase 20 acceptance via run_spec ─────────────────────────────── +// +// Toolchain-gated. Each language's run_spec block invokes the +// dynamic runner on the fixture under tests/dynamic_fixtures/message_handler/ +// and asserts the differential verdict. A missing toolchain triggers +// a structured skip (eprintln + early return) — the test stays green +// so the wider suite is not held hostage to a single host's missing +// `python3` / `node` / `javac` / `go`. + +mod e2e_phase_20 { + use crate::common::fixture_harness::FIXTURE_LOCK; + use nyx_scanner::dynamic::runner::{run_spec, RunError, RunOutcome}; + use nyx_scanner::dynamic::sandbox::SandboxOptions; + use nyx_scanner::dynamic::spec::{ + default_toolchain_id, EntryKind, HarnessSpec, PayloadSlot, SpecDerivationStrategy, + }; + use nyx_scanner::evidence::DifferentialVerdict; + use nyx_scanner::labels::Cap; + use nyx_scanner::symbol::Lang; + use std::path::PathBuf; + use std::process::Command; + use tempfile::TempDir; + + fn command_available(bin: &str) -> bool { + Command::new(bin) + .arg("--version") + .output() + .map(|o| o.status.success()) + .unwrap_or(false) + } + + fn toolchain_for(lang: Lang) -> &'static str { + match lang { + Lang::Java => "java", + Lang::Python => "python3", + Lang::JavaScript | Lang::TypeScript => "node", + Lang::Go => "go", + _ => unreachable!("e2e_phase_20 only covers Java/Python/Node/Go"), + } + } + + fn adapter_for(fixture_dir: &str) -> &'static str { + match fixture_dir { + "kafka_python" => "kafka-python", + "kafka_java" => "kafka-java", + "sqs_python" => "sqs-python", + "sqs_java" => "sqs-java", + "sqs_node" => "sqs-node", + "pubsub_python" => "pubsub-python", + "pubsub_go" => "pubsub-go", + "rabbit_python" => "rabbit-python", + "rabbit_java" => "rabbit-java", + "nats_go" => "nats-go", + other => panic!("unknown fixture_dir {other}"), + } + } + + fn build_spec( + lang: Lang, + fixture_dir: &str, + fixture_file: &str, + handler: &str, + queue: &str, + ) -> (HarnessSpec, TempDir) { + let fixture_src = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests/dynamic_fixtures/message_handler") + .join(fixture_dir) + .join(fixture_file); + let tmp = TempDir::new().expect("create tempdir"); + let dst = tmp.path().join(fixture_file); + std::fs::copy(&fixture_src, &dst).expect("copy fixture into tempdir"); + + let entry_file = dst.to_string_lossy().into_owned(); + let mut digest = blake3::Hasher::new(); + digest.update(b"phase20-e2e-message-handler|"); + digest.update(fixture_dir.as_bytes()); + digest.update(b"|"); + digest.update(fixture_file.as_bytes()); + let spec_hash = format!("{:016x}", { + let bytes = digest.finalize(); + u64::from_le_bytes(bytes.as_bytes()[..8].try_into().unwrap()) + }); + + if matches!(lang, Lang::Java) { + let workdir = std::path::PathBuf::from("/tmp/nyx-harness").join(&spec_hash); + let _ = std::fs::remove_dir_all(&workdir); + } + + let adapter = adapter_for(fixture_dir); + let framework = Some(nyx_scanner::dynamic::framework::FrameworkBinding { + adapter: adapter.to_owned(), + kind: EntryKind::MessageHandler { + queue: queue.to_owned(), + message_schema: None, + }, + route: None, + request_params: vec![], + response_writer: None, + middleware: vec![], + }); + + let spec = HarnessSpec { + finding_id: spec_hash.clone(), + entry_file: entry_file.clone(), + entry_name: handler.to_owned(), + entry_kind: EntryKind::MessageHandler { + queue: queue.to_owned(), + message_schema: None, + }, + lang, + toolchain_id: default_toolchain_id(lang).into(), + payload_slot: PayloadSlot::Param(0), + expected_cap: Cap::CODE_EXEC, + constraint_hints: vec![], + sink_file: entry_file, + sink_line: 1, + spec_hash: spec_hash.clone(), + derivation: SpecDerivationStrategy::FromFlowSteps, + stubs_required: vec![], + framework, + java_toolchain: nyx_scanner::dynamic::spec::JavaToolchain::default(), + }; + + (spec, tmp) + } + + fn run( + lang: Lang, + fixture_dir: &str, + fixture_file: &str, + handler: &str, + queue: &str, + ) -> Option { + let bin = toolchain_for(lang); + if !command_available(bin) { + eprintln!("SKIP {lang:?} {fixture_dir}/{fixture_file}: missing toolchain {bin}"); + return None; + } + let _guard = FIXTURE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); + let (spec, _tmp) = build_spec(lang, fixture_dir, fixture_file, handler, queue); + let opts = SandboxOptions { + backend: nyx_scanner::dynamic::sandbox::SandboxBackend::Process, + ..SandboxOptions::default() + }; + match run_spec(&spec, &opts) { + Ok(outcome) => Some(outcome), + Err(RunError::BuildFailed { stderr, attempts }) => { + eprintln!( + "SKIP {lang:?} {fixture_dir}/{fixture_file}: harness build failed after {attempts} attempts: {stderr}", + ); + None + } + Err(e) => panic!( + "run_spec({lang:?} {fixture_dir}/{fixture_file}) errored: {e:?}", + ), + } + } + + /// Python kafka vuln must Confirm: the synthetic Kafka loopback + /// delivers `; echo NYX_PWN_CMDI` to the handler's `os.system` + /// which prints `NYX_PWN_CMDI` to stdout and the differential + /// oracle reads it. + #[test] + fn kafka_python_vuln_confirms_via_run_spec() { + let Some(outcome) = run(Lang::Python, "kafka_python", "vuln.py", "handler", "orders") + else { + return; + }; + assert!( + outcome.triggered_by.is_some(), + "kafka-python MessageHandler vuln must Confirm via run_spec; got {outcome:?}", + ); + let diff = outcome + .differential + .as_ref() + .expect("Confirmed run must carry a DifferentialOutcome"); + assert_eq!(diff.verdict, DifferentialVerdict::Confirmed); + } + + #[test] + fn sqs_python_vuln_confirms_via_run_spec() { + let Some(outcome) = run(Lang::Python, "sqs_python", "vuln.py", "handler", "jobs") + else { + return; + }; + assert!(outcome.triggered_by.is_some()); + let diff = outcome.differential.as_ref().expect("Confirmed"); + assert_eq!(diff.verdict, DifferentialVerdict::Confirmed); + } + + #[test] + fn pubsub_python_vuln_confirms_via_run_spec() { + let Some(outcome) = run( + Lang::Python, + "pubsub_python", + "vuln.py", + "callback", + "projects/p/subscriptions/s", + ) else { + return; + }; + assert!(outcome.triggered_by.is_some()); + let diff = outcome.differential.as_ref().expect("Confirmed"); + assert_eq!(diff.verdict, DifferentialVerdict::Confirmed); + } + + #[test] + fn rabbit_python_vuln_confirms_via_run_spec() { + let Some(outcome) = run( + Lang::Python, + "rabbit_python", + "vuln.py", + "on_message", + "work", + ) else { + return; + }; + assert!(outcome.triggered_by.is_some()); + let diff = outcome.differential.as_ref().expect("Confirmed"); + assert_eq!(diff.verdict, DifferentialVerdict::Confirmed); + } + + #[test] + fn sqs_node_vuln_confirms_via_run_spec() { + let Some(outcome) = run(Lang::JavaScript, "sqs_node", "vuln.js", "handler", "jobs") + else { + return; + }; + assert!( + outcome.triggered_by.is_some(), + "sqs-node vuln failed; attempts: {:?}", + outcome.attempts, + ); + let diff = outcome.differential.as_ref().expect("Confirmed"); + assert_eq!(diff.verdict, DifferentialVerdict::Confirmed); + } +}