diff --git a/src/dynamic/lang/go.rs b/src/dynamic/lang/go.rs index 4fbb9273..720b5b23 100644 --- a/src/dynamic/lang/go.rs +++ b/src/dynamic/lang/go.rs @@ -1367,7 +1367,7 @@ fn generate_go_mod(shape: GoShape) -> String { if !deps.is_empty() { out.push_str("\nrequire (\n"); for (module, version) in deps { - out.push_str("\t"); + out.push('\t'); out.push_str(module); out.push(' '); out.push_str(version); @@ -2120,6 +2120,7 @@ fn emit_message_handler_harness(spec: &HarnessSpec, queue: &str) -> HarnessSourc nyxDispatch(msg) }}) fmt.Println("{publish_marker} " + "{queue}") + nyxRecordBrokerPublish("NYX_NATS_LOG", "{queue}", payload) broker.Publish("{queue}", payload)"##, queue = queue, publish_marker = crate::dynamic::stubs::NATS_PUBLISH_MARKER, @@ -2134,6 +2135,7 @@ fn emit_message_handler_harness(spec: &HarnessSpec, queue: &str) -> HarnessSourc nyxDispatch(msg) }}) fmt.Println("{publish_marker} " + "{queue}") + nyxRecordBrokerPublish("NYX_PUBSUB_LOG", "{queue}", payload) broker.Publish("{queue}", payload)"##, queue = queue, publish_marker = crate::dynamic::stubs::PUBSUB_PUBLISH_MARKER, @@ -2214,6 +2216,19 @@ func nyxPayload() string {{ return "" }} +func nyxRecordBrokerPublish(envName string, destination string, payload string) {{ + path := os.Getenv(envName) + if path == "" {{ + return + }} + f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil {{ + return + }} + defer f.Close() + _, _ = fmt.Fprintf(f, "%s\t%s\n", strings.ReplaceAll(destination, "\t", " "), payload) +}} + func main() {{ __nyx_install_crash_guard("{handler}") payload := nyxPayload() diff --git a/src/dynamic/lang/java.rs b/src/dynamic/lang/java.rs index 94fb33d0..a4ecddea 100644 --- a/src/dynamic/lang/java.rs +++ b/src/dynamic/lang/java.rs @@ -3502,6 +3502,7 @@ fn emit_message_handler_harness( }} }}); System.out.println({publish_marker:?} + " " + {queue:?}); + nyxRecordBrokerPublish("NYX_SQS_LOG", {queue:?}, payload); brokerRef.publish({queue:?}, payload);"#, handler = handler, queue = queue, @@ -3533,6 +3534,7 @@ fn emit_message_handler_harness( }} }}); System.out.println({publish_marker:?} + " " + {queue:?}); + nyxRecordBrokerPublish("NYX_RABBIT_LOG", {queue:?}, payload); chan.basicPublish("", {queue:?}, payload);"#, handler = handler, queue = queue, @@ -3555,6 +3557,7 @@ fn emit_message_handler_harness( }} }}); System.out.println({publish_marker:?} + " " + {queue:?}); + nyxRecordBrokerPublish("NYX_KAFKA_LOG", {queue:?}, payload); brokerRef.publish({queue:?}, payload);"#, handler = handler, queue = queue, @@ -3599,6 +3602,21 @@ public class NyxHarness {{ }} return ""; }} + + static void nyxRecordBrokerPublish(String envName, String destination, String payload) {{ + String path = System.getenv(envName); + if (path == null || path.isEmpty()) return; + String line = destination.replace('\t', ' ') + "\t" + payload + "\n"; + try {{ + java.nio.file.Files.write( + java.nio.file.Paths.get(path), + line.getBytes(java.nio.charset.StandardCharsets.UTF_8), + java.nio.file.StandardOpenOption.CREATE, + java.nio.file.StandardOpenOption.APPEND + ); + }} catch (Exception ignored) {{ + }} + }} }} "#, entry_class = entry_class, diff --git a/src/dynamic/lang/js_shared.rs b/src/dynamic/lang/js_shared.rs index 0c9b80f1..70de48ca 100644 --- a/src/dynamic/lang/js_shared.rs +++ b/src/dynamic/lang/js_shared.rs @@ -920,6 +920,17 @@ if (typeof _handler !== 'function') {{ }} const _broker = new NyxSqsLoopback(); +function _nyxRecordBrokerPublish(envName, destination, body) {{ + const path = process.env[envName] || ''; + if (!path) return; + try {{ + require('fs').appendFileSync( + path, + String(destination).replace(/\t/g, ' ') + '\t' + String(body) + '\n', + 'utf8' + ); + }} catch (_) {{}} +}} _broker.subscribe({queue:?}, async (envelope) => {{ try {{ // Sink-reachability sentinel — runner's `vuln_fired && sink_hit` @@ -933,6 +944,7 @@ _broker.subscribe({queue:?}, async (envelope) => {{ (async () => {{ process.stdout.write({publish_marker:?} + ' ' + {queue:?} + '\n'); + _nyxRecordBrokerPublish('NYX_SQS_LOG', {queue:?}, payload); _broker.publish({queue:?}, payload); }})(); "#, diff --git a/src/dynamic/lang/python.rs b/src/dynamic/lang/python.rs index 88f12e68..36013c1d 100644 --- a/src/dynamic/lang/python.rs +++ b/src/dynamic/lang/python.rs @@ -958,6 +958,7 @@ def _nyx_sqs_dispatch(envelope): _h(envelope) _loop.subscribe({queue:?}, _nyx_sqs_dispatch) print({publish_marker:?} + " " + {queue:?}, flush=True) +_nyx_record_broker_publish("NYX_SQS_LOG", {queue:?}, payload) _loop.publish({queue:?}, payload)"#, handler = handler, queue = queue, @@ -973,6 +974,7 @@ def _nyx_pubsub_dispatch(message): _h(message) _loop.subscribe({queue:?}, _nyx_pubsub_dispatch) print({publish_marker:?} + " " + {queue:?}, flush=True) +_nyx_record_broker_publish("NYX_PUBSUB_LOG", {queue:?}, payload) _loop.publish({queue:?}, payload)"#, handler = handler, queue = queue, @@ -988,6 +990,7 @@ def _nyx_rabbit_dispatch(ch, method, props, body): _h(ch, method, props, body) _chan.basic_consume(queue={queue:?}, on_message_callback=_nyx_rabbit_dispatch) print({publish_marker:?} + " " + {queue:?}, flush=True) +_nyx_record_broker_publish("NYX_RABBIT_LOG", {queue:?}, payload) _chan.basic_publish(exchange="", routing_key={queue:?}, body=payload)"#, handler = handler, queue = queue, @@ -1003,6 +1006,7 @@ def _nyx_kafka_dispatch(message): _h(message) _loop.subscribe({queue:?}, _nyx_kafka_dispatch) print({publish_marker:?} + " " + {queue:?}, flush=True) +_nyx_record_broker_publish("NYX_KAFKA_LOG", {queue:?}, payload) _loop.publish({queue:?}, payload)"#, handler = handler, queue = queue, @@ -1017,6 +1021,16 @@ _loop.publish({queue:?}, payload)"#, {pubsub_src} {rabbit_src} +def _nyx_record_broker_publish(env_name, destination, body): + path = os.environ.get(env_name, "") + if not path: + return + try: + with open(path, "a", encoding="utf-8") as f: + f.write(str(destination).replace("\t", " ") + "\t" + str(body) + "\n") + except Exception: + pass + try: {register_and_publish} except SystemExit as _e: diff --git a/src/dynamic/spec.rs b/src/dynamic/spec.rs index 2fd8e6f8..621961d8 100644 --- a/src/dynamic/spec.rs +++ b/src/dynamic/spec.rs @@ -1320,6 +1320,7 @@ fn infer_framework_project_root(entry_path: &Path, lang: Lang) -> Option Option { + match adapter.split_once('-').map(|(broker, _)| broker) { + Some("kafka") => Some(StubKind::Kafka), + Some("sqs") => Some(StubKind::Sqs), + Some("pubsub") => Some(StubKind::Pubsub), + Some("rabbit") => Some(StubKind::Rabbit), + Some("nats") => Some(StubKind::Nats), + _ => None, + } } /// Pick the tree-sitter `Language` for a given [`Lang`]. Returns @@ -2450,6 +2471,11 @@ mod tests { } let fw = spec.framework.as_ref().expect("framework must be set"); assert_eq!(fw.adapter, "kafka-python"); + assert_eq!( + spec.stubs_required, + vec![crate::dynamic::stubs::StubKind::Kafka], + "MessageHandler specs must request the matching broker runtime provider", + ); assert_ne!(pre_hash, spec.spec_hash); } diff --git a/src/dynamic/stubs/broker.rs b/src/dynamic/stubs/broker.rs new file mode 100644 index 00000000..5ac9b6bc --- /dev/null +++ b/src/dynamic/stubs/broker.rs @@ -0,0 +1,167 @@ +//! Runtime broker loopback stubs. +//! +//! These providers give broker-shaped harnesses the same lifecycle as +//! SQL, HTTP, Redis, filesystem, and mock stubs: the verifier starts a +//! host-side provider, publishes a stable endpoint into the sandbox +//! environment, and drains structured events after each payload run. +//! The per-language source snippets still provide the in-process +//! delivery API used by today's message-handler harnesses; this +//! provider is the shared recording and routing surface those snippets +//! can use. + +use super::{StubEvent, StubKind, StubProvider, monotonic_ns}; +use std::fs::OpenOptions; +use std::io::{BufRead, BufReader, Write}; +use std::path::{Path, PathBuf}; +use std::sync::Mutex; +use tempfile::TempDir; + +/// Broker-cap stub. Endpoint is a stable loopback URI; the companion +/// recording endpoint is a log file path the sandbox harness can +/// append one publish event per line to. +#[derive(Debug)] +pub struct BrokerStub { + kind: StubKind, + tempdir: Option, + log_path: PathBuf, + cursor: Mutex, +} + +impl BrokerStub { + /// Start a broker stub rooted near `workdir`. + pub fn start(kind: StubKind, workdir: &Path) -> std::io::Result { + debug_assert!(kind.is_broker(), "BrokerStub only supports broker kinds"); + let tempdir = TempDir::new_in(workdir).or_else(|_| TempDir::new())?; + let log_path = tempdir + .path() + .join(format!("nyx_{}_stub.events.log", kind.tag())); + std::fs::File::create(&log_path)?; + Ok(Self { + kind, + tempdir: Some(tempdir), + log_path, + cursor: Mutex::new(0), + }) + } + + /// Path to the append-only event log consumed by `drain_events`. + pub fn log_path(&self) -> &Path { + &self.log_path + } + + /// Host-side helper used by tests and future native broker + /// adapters. The line format is intentionally simple so shell, + /// Java, Python, Node, Go, PHP, Ruby, and Rust harnesses can append + /// it without a JSON dependency: + /// + /// `topicpayload` + pub fn record_publish(&self, destination: &str, payload: &str) -> std::io::Result<()> { + let mut f = OpenOptions::new() + .append(true) + .create(true) + .open(&self.log_path)?; + writeln!(f, "{}\t{}", destination.replace('\t', " "), payload)?; + Ok(()) + } +} + +impl StubProvider for BrokerStub { + fn kind(&self) -> StubKind { + self.kind + } + + fn endpoint(&self) -> String { + format!("loopback://{}", self.kind.tag()) + } + + fn recording_endpoint(&self) -> Option<(&'static str, String)> { + Some(( + self.kind.broker_log_env_var()?, + self.log_path.to_string_lossy().into_owned(), + )) + } + + fn drain_events(&self) -> Vec { + let mut cursor = match self.cursor.lock() { + Ok(g) => g, + Err(_) => return Vec::new(), + }; + let file = match std::fs::File::open(&self.log_path) { + Ok(f) => f, + Err(_) => return Vec::new(), + }; + use std::io::Seek; + let mut reader = BufReader::new(file); + if reader.seek(std::io::SeekFrom::Start(*cursor)).is_err() { + return Vec::new(); + } + + let mut events = Vec::new(); + let mut bytes_read = 0_u64; + let mut buf = String::new(); + loop { + buf.clear(); + let n = match reader.read_line(&mut buf) { + Ok(0) => break, + Ok(n) => n, + Err(_) => break, + }; + bytes_read += n as u64; + let line = buf.trim_end_matches(['\r', '\n']); + if line.is_empty() { + continue; + } + let (destination, payload) = line.split_once('\t').unwrap_or((line, "")); + let event = StubEvent { + kind: self.kind, + captured_at_ns: monotonic_ns(), + summary: format!("publish {destination}"), + detail: std::collections::BTreeMap::from([ + ("destination".to_owned(), destination.to_owned()), + ("payload".to_owned(), payload.to_owned()), + ]), + }; + events.push(event); + } + *cursor += bytes_read; + events + } +} + +impl Drop for BrokerStub { + fn drop(&mut self) { + self.tempdir.take(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[test] + fn broker_start_creates_recording_log() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Kafka, dir.path()).unwrap(); + assert!(stub.log_path().exists()); + assert_eq!(stub.endpoint(), "loopback://kafka"); + assert_eq!( + stub.recording_endpoint().unwrap().0, + StubKind::Kafka.broker_log_env_var().unwrap() + ); + } + + #[test] + fn broker_publish_lands_in_drain_events() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Sqs, dir.path()).unwrap(); + stub.record_publish("queue-a", "NYX_PWN_CMDI").unwrap(); + let events = stub.drain_events(); + assert_eq!(events.len(), 1); + assert_eq!(events[0].kind, StubKind::Sqs); + assert_eq!(events[0].summary, "publish queue-a"); + assert_eq!(events[0].detail.get("destination").unwrap(), "queue-a"); + assert_eq!(events[0].detail.get("payload").unwrap(), "NYX_PWN_CMDI"); + assert!(stub.drain_events().is_empty(), "drain cursor must advance"); + } +} diff --git a/src/dynamic/stubs/mod.rs b/src/dynamic/stubs/mod.rs index 3c16df30..049d6f02 100644 --- a/src/dynamic/stubs/mod.rs +++ b/src/dynamic/stubs/mod.rs @@ -51,6 +51,7 @@ //! [`crate::dynamic::oracle::oracle_fired_with_stubs`] so the //! `StubEventMatches` predicate can satisfy a payload. +pub mod broker; pub mod broker_kafka; pub mod broker_nats; pub mod broker_pubsub; @@ -65,6 +66,7 @@ pub mod redis; pub mod sql; pub mod xpath_document; +pub use broker::BrokerStub; pub use broker_kafka::{KAFKA_PUBLISH_MARKER, kafka_source}; pub use broker_nats::{NATS_PUBLISH_MARKER, nats_source}; pub use broker_pubsub::{PUBSUB_PUBLISH_MARKER, pubsub_source}; @@ -111,6 +113,16 @@ pub enum StubKind { MockDatabaseConnection, /// Runtime provider for an injectable logger test double. MockLogger, + /// Runtime provider for a Kafka-shaped broker loopback. + Kafka, + /// Runtime provider for an SQS-shaped broker loopback. + Sqs, + /// Runtime provider for a Google Pub/Sub-shaped broker loopback. + Pubsub, + /// Runtime provider for a RabbitMQ-shaped broker loopback. + Rabbit, + /// Runtime provider for a NATS-shaped broker loopback. + Nats, } impl StubKind { @@ -128,6 +140,11 @@ impl StubKind { StubKind::MockHttpClient => "NYX_MOCK_HTTP_CLIENT_ENDPOINT", StubKind::MockDatabaseConnection => "NYX_MOCK_DATABASE_CONNECTION_ENDPOINT", StubKind::MockLogger => "NYX_MOCK_LOGGER_ENDPOINT", + StubKind::Kafka => "NYX_KAFKA_ENDPOINT", + StubKind::Sqs => "NYX_SQS_ENDPOINT", + StubKind::Pubsub => "NYX_PUBSUB_ENDPOINT", + StubKind::Rabbit => "NYX_RABBIT_ENDPOINT", + StubKind::Nats => "NYX_NATS_ENDPOINT", } } @@ -144,6 +161,32 @@ impl StubKind { StubKind::MockHttpClient => "mock_http_client", StubKind::MockDatabaseConnection => "mock_database_connection", StubKind::MockLogger => "mock_logger", + StubKind::Kafka => "kafka", + StubKind::Sqs => "sqs", + StubKind::Pubsub => "pubsub", + StubKind::Rabbit => "rabbit", + StubKind::Nats => "nats", + } + } + + /// True for message-broker provider kinds. + pub const fn is_broker(self) -> bool { + matches!( + self, + StubKind::Kafka | StubKind::Sqs | StubKind::Pubsub | StubKind::Rabbit | StubKind::Nats + ) + } + + /// Companion log env var used by broker loopback harnesses to + /// append publish observations that the host drains as `StubEvent`s. + pub const fn broker_log_env_var(self) -> Option<&'static str> { + match self { + StubKind::Kafka => Some("NYX_KAFKA_LOG"), + StubKind::Sqs => Some("NYX_SQS_LOG"), + StubKind::Pubsub => Some("NYX_PUBSUB_LOG"), + StubKind::Rabbit => Some("NYX_RABBIT_LOG"), + StubKind::Nats => Some("NYX_NATS_LOG"), + _ => None, } } @@ -291,6 +334,11 @@ impl StubHarness { Arc::new(MockStub::start(MockKind::DatabaseConnection, workdir)?) } StubKind::MockLogger => Arc::new(MockStub::start(MockKind::Logger, workdir)?), + StubKind::Kafka + | StubKind::Sqs + | StubKind::Pubsub + | StubKind::Rabbit + | StubKind::Nats => Arc::new(BrokerStub::start(k, workdir)?), }; stubs.push(stub); } @@ -374,6 +422,11 @@ mod tests { StubKind::MockHttpClient, StubKind::MockDatabaseConnection, StubKind::MockLogger, + StubKind::Kafka, + StubKind::Sqs, + StubKind::Pubsub, + StubKind::Rabbit, + StubKind::Nats, ] .iter() .map(|k| k.env_var()) @@ -445,6 +498,7 @@ mod tests { StubKind::Sql, StubKind::Filesystem, StubKind::MockHttpClient, + StubKind::Kafka, ], dir.path(), ) @@ -454,9 +508,39 @@ mod tests { assert!(names.contains(&"NYX_FS_ROOT")); assert!(names.contains(&"NYX_MOCK_HTTP_CLIENT_ENDPOINT")); assert!(names.contains(&"NYX_MOCK_HTTP_CLIENT_LOG")); + assert!(names.contains(&"NYX_KAFKA_ENDPOINT")); + assert!(names.contains(&"NYX_KAFKA_LOG")); assert_eq!(StubKind::Http.env_var(), "NYX_HTTP_ENDPOINT"); } + #[test] + fn broker_kinds_start_as_runtime_providers() { + let dir = TempDir::new().unwrap(); + let h = StubHarness::start( + &[ + StubKind::Kafka, + StubKind::Sqs, + StubKind::Pubsub, + StubKind::Rabbit, + StubKind::Nats, + ], + dir.path(), + ) + .unwrap(); + assert_eq!(h.len(), 5); + let pairs = h.endpoints(); + for (endpoint, log) in [ + ("NYX_KAFKA_ENDPOINT", "NYX_KAFKA_LOG"), + ("NYX_SQS_ENDPOINT", "NYX_SQS_LOG"), + ("NYX_PUBSUB_ENDPOINT", "NYX_PUBSUB_LOG"), + ("NYX_RABBIT_ENDPOINT", "NYX_RABBIT_LOG"), + ("NYX_NATS_ENDPOINT", "NYX_NATS_LOG"), + ] { + assert!(pairs.iter().any(|(name, _)| *name == endpoint)); + assert!(pairs.iter().any(|(name, _)| *name == log)); + } + } + #[test] fn endpoints_includes_sql_recording_path_companion_var() { let dir = TempDir::new().unwrap(); diff --git a/tests/message_handler_corpus.rs b/tests/message_handler_corpus.rs index c06c7b5d..33c7251d 100644 --- a/tests/message_handler_corpus.rs +++ b/tests/message_handler_corpus.rs @@ -157,6 +157,8 @@ fn message_handler_python_dispatch_subscribes_to_loopback() { assert!(h.source.contains("NyxKafkaLoopback")); assert!(h.source.contains("subscribe")); assert!(h.source.contains("__NYX_BROKER_PUBLISH__")); + assert!(h.source.contains("NYX_KAFKA_LOG")); + assert!(h.source.contains("_nyx_record_broker_publish")); assert!(h.source.contains("payload")); } @@ -167,6 +169,8 @@ fn message_handler_java_emits_reflective_dispatch() { assert!(h.source.contains("NyxKafkaLoopback")); assert!(h.source.contains("Class.forName")); assert!(h.source.contains("getDeclaredMethod")); + assert!(h.source.contains("NYX_KAFKA_LOG")); + assert!(h.source.contains("nyxRecordBrokerPublish")); } #[test] @@ -176,6 +180,8 @@ fn message_handler_node_uses_sqs_loopback() { assert!(h.source.contains("NyxSqsLoopback")); assert!(h.source.contains("subscribe")); assert!(h.source.contains("__NYX_BROKER_PUBLISH__:sqs")); + assert!(h.source.contains("NYX_SQS_LOG")); + assert!(h.source.contains("_nyxRecordBrokerPublish")); } #[test] @@ -184,6 +190,8 @@ fn message_handler_go_uses_nyx_handlers_registry() { let h = lang::emit(&spec).expect("emit ok"); assert!(h.source.contains("entry.NyxHandlers")); assert!(h.source.contains("NewNyxPubsubLoopback")); + assert!(h.source.contains("NYX_PUBSUB_LOG")); + assert!(h.source.contains("nyxRecordBrokerPublish")); } // ── Framework-adapter assertions ──────────────────────────────────────────────