diff --git a/src/dynamic/lang/java.rs b/src/dynamic/lang/java.rs index 20de0bfa..e0e4d10c 100644 --- a/src/dynamic/lang/java.rs +++ b/src/dynamic/lang/java.rs @@ -3803,25 +3803,27 @@ fn emit_message_handler_harness( JavaBroker::Sqs => ( crate::dynamic::stubs::SQS_PUBLISH_MARKER, format!( - r#" NyxSqsLoopback brokerRef = new NyxSqsLoopback(); - System.out.println({publish_marker:?} + " " + {queue:?}); - nyxRecordBrokerPublish("NYX_SQS_LOG", {queue:?}, payload); - brokerRef.publish({queue:?}, payload); - for (java.util.Map env : brokerRef.receiveMessage({queue:?}, 1)) {{ - nyxRecordBrokerEvent("NYX_SQS_LOG", "deliver", {queue:?}, env.getOrDefault("Body", "")); - System.out.println("__NYX_SINK_HIT__"); - boolean success = false; - try {{ - java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod({handler:?}, java.util.Map.class); - m.setAccessible(true); - m.invoke(entryInst, env); - success = true; - }} catch (Exception e) {{ - Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; - System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); - }} - if (success && brokerRef.deleteMessage({queue:?}, env.getOrDefault("ReceiptHandle", ""))) {{ - nyxRecordBrokerEvent("NYX_SQS_LOG", "ack", {queue:?}, env.getOrDefault("ReceiptHandle", "")); + r#" if (!nyxTryRealSqs({queue:?}, payload, entryInst, {handler:?})) {{ + NyxSqsLoopback brokerRef = new NyxSqsLoopback(); + System.out.println({publish_marker:?} + " " + {queue:?}); + nyxRecordBrokerPublish("NYX_SQS_LOG", {queue:?}, payload); + brokerRef.publish({queue:?}, payload); + for (java.util.Map env : brokerRef.receiveMessage({queue:?}, 1)) {{ + nyxRecordBrokerEvent("NYX_SQS_LOG", "deliver", {queue:?}, env.getOrDefault("Body", "")); + System.out.println("__NYX_SINK_HIT__"); + boolean success = false; + try {{ + java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod({handler:?}, java.util.Map.class); + m.setAccessible(true); + m.invoke(entryInst, env); + success = true; + }} catch (Exception e) {{ + Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; + System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); + }} + if (success && brokerRef.deleteMessage({queue:?}, env.getOrDefault("ReceiptHandle", ""))) {{ + nyxRecordBrokerEvent("NYX_SQS_LOG", "ack", {queue:?}, env.getOrDefault("ReceiptHandle", "")); + }} }} }}"#, handler = handler, @@ -3926,6 +3928,97 @@ public class NyxHarness {{ }} }} + static boolean nyxTryRealSqs(String queue, String payload, Object entryInst, String handler) {{ + String endpoint = System.getenv("NYX_SQS_ENDPOINT"); + if (endpoint == null || !(endpoint.startsWith("http://") || endpoint.startsWith("https://"))) {{ + return false; + }} + Object client = null; + try {{ + Class sqsClientClass = Class.forName("software.amazon.awssdk.services.sqs.SqsClient"); + Object builder = sqsClientClass.getMethod("builder").invoke(null); + Class regionClass = Class.forName("software.amazon.awssdk.regions.Region"); + Object region = regionClass.getMethod("of", String.class).invoke(null, "us-east-1"); + builder.getClass().getMethod("endpointOverride", java.net.URI.class) + .invoke(builder, java.net.URI.create(endpoint)); + builder.getClass().getMethod("region", regionClass).invoke(builder, region); + + Class basicCredentialsClass = Class.forName("software.amazon.awssdk.auth.credentials.AwsBasicCredentials"); + Class credentialsClass = Class.forName("software.amazon.awssdk.auth.credentials.AwsCredentials"); + Class providerClass = Class.forName("software.amazon.awssdk.auth.credentials.StaticCredentialsProvider"); + Class providerInterface = Class.forName("software.amazon.awssdk.auth.credentials.AwsCredentialsProvider"); + Object credentials = basicCredentialsClass.getMethod("create", String.class, String.class) + .invoke(null, "nyx", "nyx"); + Object provider = providerClass.getMethod("create", credentialsClass).invoke(null, credentials); + builder.getClass().getMethod("credentialsProvider", providerInterface).invoke(builder, provider); + client = builder.getClass().getMethod("build").invoke(builder); + + String queueUrl = endpoint.replaceAll("/+$", "") + "/" + queue.replaceAll("^/+", ""); + System.out.println({sqs_publish_marker:?} + " " + queue); + + Class sendReqClass = Class.forName("software.amazon.awssdk.services.sqs.model.SendMessageRequest"); + Object sendBuilder = sendReqClass.getMethod("builder").invoke(null); + sendBuilder.getClass().getMethod("queueUrl", String.class).invoke(sendBuilder, queueUrl); + sendBuilder.getClass().getMethod("messageBody", String.class).invoke(sendBuilder, payload); + Object sendReq = sendBuilder.getClass().getMethod("build").invoke(sendBuilder); + sqsClientClass.getMethod("sendMessage", sendReqClass).invoke(client, sendReq); + + Class receiveReqClass = Class.forName("software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest"); + Object receiveBuilder = receiveReqClass.getMethod("builder").invoke(null); + receiveBuilder.getClass().getMethod("queueUrl", String.class).invoke(receiveBuilder, queueUrl); + receiveBuilder.getClass().getMethod("maxNumberOfMessages", Integer.class).invoke(receiveBuilder, Integer.valueOf(1)); + receiveBuilder.getClass().getMethod("waitTimeSeconds", Integer.class).invoke(receiveBuilder, Integer.valueOf(0)); + Object receiveReq = receiveBuilder.getClass().getMethod("build").invoke(receiveBuilder); + Object receiveResp = sqsClientClass.getMethod("receiveMessage", receiveReqClass).invoke(client, receiveReq); + java.util.List messages = (java.util.List) receiveResp.getClass().getMethod("messages").invoke(receiveResp); + if (messages == null || messages.isEmpty()) {{ + return false; + }} + + for (Object msg : messages) {{ + String body = String.valueOf(msg.getClass().getMethod("body").invoke(msg)); + String receipt = String.valueOf(msg.getClass().getMethod("receiptHandle").invoke(msg)); + String messageId = String.valueOf(msg.getClass().getMethod("messageId").invoke(msg)); + java.util.Map env = new java.util.HashMap<>(); + env.put("Body", body); + env.put("ReceiptHandle", receipt); + env.put("MessageId", messageId); + System.out.println("__NYX_SINK_HIT__"); + boolean success = false; + try {{ + java.lang.reflect.Method m = entryInst.getClass().getDeclaredMethod(handler, java.util.Map.class); + m.setAccessible(true); + m.invoke(entryInst, env); + success = true; + }} catch (Exception e) {{ + Throwable c = (e instanceof java.lang.reflect.InvocationTargetException && e.getCause() != null) ? e.getCause() : e; + System.err.println("NYX_EXCEPTION: " + c.getClass().getName() + ": " + c.getMessage()); + }} + if (success && receipt != null && !receipt.isEmpty()) {{ + Class deleteReqClass = Class.forName("software.amazon.awssdk.services.sqs.model.DeleteMessageRequest"); + Object deleteBuilder = deleteReqClass.getMethod("builder").invoke(null); + deleteBuilder.getClass().getMethod("queueUrl", String.class).invoke(deleteBuilder, queueUrl); + deleteBuilder.getClass().getMethod("receiptHandle", String.class).invoke(deleteBuilder, receipt); + Object deleteReq = deleteBuilder.getClass().getMethod("build").invoke(deleteBuilder); + sqsClientClass.getMethod("deleteMessage", deleteReqClass).invoke(client, deleteReq); + }} + }} + return true; + }} catch (ClassNotFoundException missingSdk) {{ + return false; + }} catch (Throwable e) {{ + System.err.println("NYX_REAL_SQS_FALLBACK: " + e.getClass().getName() + ": " + e.getMessage()); + return false; + }} finally {{ + if (client instanceof AutoCloseable) {{ + try {{ + ((AutoCloseable) client).close(); + }} catch (Exception ignored) {{ + }} + }} + }} + }} + static String nyxPayload() {{ String v = System.getenv("NYX_PAYLOAD"); if (v != null && !v.isEmpty()) return v; @@ -3959,6 +4052,7 @@ public class NyxHarness {{ "#, entry_class = entry_class, dispatch_block = dispatch_block, + sqs_publish_marker = crate::dynamic::stubs::SQS_PUBLISH_MARKER, ); HarnessSource { source, @@ -3966,7 +4060,7 @@ public class NyxHarness {{ command: vec![ "java".to_owned(), "-cp".to_owned(), - ".".to_owned(), + ".:lib/*".to_owned(), "NyxHarness".to_owned(), ], extra_files: { diff --git a/src/dynamic/lang/js_shared.rs b/src/dynamic/lang/js_shared.rs index d789552e..0fbec238 100644 --- a/src/dynamic/lang/js_shared.rs +++ b/src/dynamic/lang/js_shared.rs @@ -945,6 +945,53 @@ function _nyxRecordBrokerPublish(envName, destination, body) {{ _nyxRecordBrokerEvent(envName, 'publish', destination, body); }} +async function _nyxTryRealSqs(queue, body) {{ + const endpoint = process.env.NYX_SQS_ENDPOINT || ''; + if (!/^https?:\/\//.test(endpoint)) return false; + let sqs; + try {{ + sqs = require('@aws-sdk/client-sqs'); + }} catch (_) {{ + return false; + }} + try {{ + const client = new sqs.SQSClient({{ + endpoint, + region: 'us-east-1', + credentials: {{ accessKeyId: 'nyx', secretAccessKey: 'nyx' }}, + maxAttempts: 1, + }}); + const queueUrl = endpoint.replace(/\/$/, '') + '/' + String(queue).replace(/^\//, ''); + process.stdout.write({publish_marker:?} + ' ' + queue + '\n'); + await client.send(new sqs.SendMessageCommand({{ + QueueUrl: queueUrl, + MessageBody: String(body), + }})); + const response = await client.send(new sqs.ReceiveMessageCommand({{ + QueueUrl: queueUrl, + MaxNumberOfMessages: 1, + WaitTimeSeconds: 0, + AttributeNames: ['ApproximateReceiveCount'], + }})); + const messages = response.Messages || []; + if (messages.length === 0) return false; + for (const envelope of messages) {{ + const ok = await _nyxDispatchEnvelope(envelope); + if (ok && envelope.ReceiptHandle) {{ + await client.send(new sqs.DeleteMessageCommand({{ + QueueUrl: queueUrl, + ReceiptHandle: envelope.ReceiptHandle, + }})); + }} + }} + if (typeof client.destroy === 'function') client.destroy(); + return true; + }} catch (e) {{ + process.stderr.write('NYX_REAL_SQS_FALLBACK: ' + (e && e.message ? e.message : String(e)) + '\n'); + return false; + }} +}} + async function _nyxDispatchEnvelope(envelope) {{ try {{ // Sink-reachability sentinel — runner's `vuln_fired && sink_hit` @@ -959,6 +1006,7 @@ async function _nyxDispatchEnvelope(envelope) {{ }} (async () => {{ + if (await _nyxTryRealSqs({queue:?}, payload)) return; process.stdout.write({publish_marker:?} + ' ' + {queue:?} + '\n'); _nyxRecordBrokerPublish('NYX_SQS_LOG', {queue:?}, payload); _broker.publish({queue:?}, payload); diff --git a/src/dynamic/lang/python.rs b/src/dynamic/lang/python.rs index 5dd1b335..6c39d9e0 100644 --- a/src/dynamic/lang/python.rs +++ b/src/dynamic/lang/python.rs @@ -958,22 +958,23 @@ fn emit_message_handler(spec: &HarnessSpec, queue: &str) -> HarnessSource { let register_and_publish = match broker { PythonBroker::Sqs => format!( - r#"_loop = NyxSqsLoopback() -def _nyx_sqs_dispatch(envelope): - _h = getattr(_entry_mod, {handler:?}, None) - if _h is None: - print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True) - sys.exit(78) - _h(envelope) -_loop.subscribe({queue:?}, _nyx_sqs_dispatch) -print({publish_marker:?} + " " + {queue:?}, flush=True) -_nyx_record_broker_publish("NYX_SQS_LOG", {queue:?}, payload) -_loop.publish({queue:?}, payload) -for _env in _loop.receive_message({queue:?}, max_number=1): - _nyx_record_broker_event("NYX_SQS_LOG", "deliver", {queue:?}, _env.get("Body", "")) - _nyx_sqs_dispatch(_env) - if _loop.delete_message({queue:?}, _env.get("ReceiptHandle", "")): - _nyx_record_broker_event("NYX_SQS_LOG", "ack", {queue:?}, _env.get("ReceiptHandle", ""))"#, + r#"if not _nyx_try_real_sqs({queue:?}, payload, {handler:?}): + _loop = NyxSqsLoopback() + def _nyx_sqs_dispatch(envelope): + _h = getattr(_entry_mod, {handler:?}, None) + if _h is None: + print("NYX_HANDLER_NOT_FOUND: " + {handler:?}, file=sys.stderr, flush=True) + sys.exit(78) + _h(envelope) + _loop.subscribe({queue:?}, _nyx_sqs_dispatch) + print({publish_marker:?} + " " + {queue:?}, flush=True) + _nyx_record_broker_publish("NYX_SQS_LOG", {queue:?}, payload) + _loop.publish({queue:?}, payload) + for _env in _loop.receive_message({queue:?}, max_number=1): + _nyx_record_broker_event("NYX_SQS_LOG", "deliver", {queue:?}, _env.get("Body", "")) + _nyx_sqs_dispatch(_env) + if _loop.delete_message({queue:?}, _env.get("ReceiptHandle", "")): + _nyx_record_broker_event("NYX_SQS_LOG", "ack", {queue:?}, _env.get("ReceiptHandle", ""))"#, handler = handler, queue = queue, publish_marker = crate::dynamic::stubs::SQS_PUBLISH_MARKER, @@ -1063,6 +1064,61 @@ def _nyx_record_broker_event(env_name, action, destination, body): def _nyx_record_broker_publish(env_name, destination, body): _nyx_record_broker_event(env_name, "publish", destination, body) +def _nyx_try_real_sqs(queue, body, handler_name): + endpoint = os.environ.get("NYX_SQS_ENDPOINT", "") + if not (endpoint.startswith("http://") or endpoint.startswith("https://")): + return False + try: + import boto3 + try: + from botocore.config import Config + _cfg = Config( + retries={{"max_attempts": 0}}, + connect_timeout=1, + read_timeout=2, + ) + except Exception: + _cfg = None + except Exception: + return False + _h = getattr(_entry_mod, handler_name, None) + if _h is None: + print("NYX_HANDLER_NOT_FOUND: " + handler_name, file=sys.stderr, flush=True) + sys.exit(78) + try: + _kwargs = {{ + "endpoint_url": endpoint, + "region_name": "us-east-1", + "aws_access_key_id": "nyx", + "aws_secret_access_key": "nyx", + }} + if _cfg is not None: + _kwargs["config"] = _cfg + _client = boto3.client("sqs", **_kwargs) + _queue_url = endpoint.rstrip("/") + "/" + str(queue).strip("/") + print({sqs_publish_marker:?} + " " + str(queue), flush=True) + _client.send_message(QueueUrl=_queue_url, MessageBody=str(body)) + _resp = _client.receive_message( + QueueUrl=_queue_url, + MaxNumberOfMessages=1, + WaitTimeSeconds=0, + AttributeNames=["ApproximateReceiveCount"], + ) + _messages = _resp.get("Messages", []) + if not _messages: + return False + for _msg in _messages: + _h(_msg) + _receipt = _msg.get("ReceiptHandle", "") + if _receipt: + _client.delete_message(QueueUrl=_queue_url, ReceiptHandle=_receipt) + return True + except SystemExit: + raise + except Exception as _e: + print(f"NYX_REAL_SQS_FALLBACK: {{type(_e).__name__}}: {{_e}}", file=sys.stderr, flush=True) + return False + try: {register_and_publish} except SystemExit as _e: @@ -1075,6 +1131,7 @@ except Exception as _e: pubsub_src = pubsub_src, rabbit_src = rabbit_src, register_and_publish = indent_lines(®ister_and_publish, " "), + sqs_publish_marker = crate::dynamic::stubs::SQS_PUBLISH_MARKER, ); HarnessSource { source: format!("{preamble}\n{body}\n{postamble}"), diff --git a/src/dynamic/sandbox/mod.rs b/src/dynamic/sandbox/mod.rs index 25fc999c..e6ba234c 100644 --- a/src/dynamic/sandbox/mod.rs +++ b/src/dynamic/sandbox/mod.rs @@ -898,6 +898,11 @@ fn rewrite_extra_env_for_container( { return (k.clone(), format!("{}/{idx}", docker::STUB_MOUNT_ROOT)); } + if matches!(k.as_str(), "NYX_HTTP_ENDPOINT" | "NYX_SQS_ENDPOINT") + && let Some(rest) = v.strip_prefix("http://127.0.0.1:") + { + return (k.clone(), format!("http://host-gateway:{rest}")); + } (k.clone(), v.clone()) }) .collect() @@ -2259,15 +2264,37 @@ mod tests { #[test] fn rewrite_extra_env_passes_unrelated_pairs_through() { + let extra = vec![("NYX_SQL_ENDPOINT".to_owned(), "/tmp/abc.db".to_owned())]; + let out = rewrite_extra_env_for_container(&extra, &[]); + assert_eq!(out, extra); + } + + #[test] + fn rewrite_extra_env_maps_loopback_http_stubs_to_host_gateway() { let extra = vec![ - ("NYX_SQL_ENDPOINT".to_owned(), "/tmp/abc.db".to_owned()), ( "NYX_HTTP_ENDPOINT".to_owned(), "http://127.0.0.1:12345".to_owned(), ), + ( + "NYX_SQS_ENDPOINT".to_owned(), + "http://127.0.0.1:23456/jobs".to_owned(), + ), ]; let out = rewrite_extra_env_for_container(&extra, &[]); - assert_eq!(out, extra); + assert_eq!( + out, + vec![ + ( + "NYX_HTTP_ENDPOINT".to_owned(), + "http://host-gateway:12345".to_owned(), + ), + ( + "NYX_SQS_ENDPOINT".to_owned(), + "http://host-gateway:23456/jobs".to_owned(), + ), + ] + ); } #[test] diff --git a/src/dynamic/stubs/broker.rs b/src/dynamic/stubs/broker.rs index 64cb6ae9..1f2c40b1 100644 --- a/src/dynamic/stubs/broker.rs +++ b/src/dynamic/stubs/broker.rs @@ -10,10 +10,15 @@ //! can use. use super::{StubEvent, StubKind, StubProvider, monotonic_ns}; +use std::collections::{BTreeMap, VecDeque}; use std::fs::OpenOptions; -use std::io::{BufRead, BufReader, Write}; +use std::io::{BufRead, BufReader, Read, Write}; +use std::net::{TcpListener, TcpStream}; use std::path::{Path, PathBuf}; +use std::sync::Arc; use std::sync::Mutex; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; use tempfile::TempDir; /// Broker-cap stub. Endpoint is a stable loopback URI; the companion @@ -25,6 +30,7 @@ pub struct BrokerStub { tempdir: Option, log_path: PathBuf, cursor: Mutex, + sqs_listener: Option, } impl BrokerStub { @@ -36,11 +42,17 @@ impl BrokerStub { .path() .join(format!("nyx_{}_stub.events.log", kind.tag())); std::fs::File::create(&log_path)?; + let sqs_listener = if kind == StubKind::Sqs { + start_sqs_listener(log_path.clone())? + } else { + None + }; Ok(Self { kind, tempdir: Some(tempdir), log_path, cursor: Mutex::new(0), + sqs_listener, }) } @@ -95,6 +107,9 @@ impl StubProvider for BrokerStub { } fn endpoint(&self) -> String { + if let Some(listener) = &self.sqs_listener { + return format!("http://127.0.0.1:{}", listener.port); + } format!("loopback://{}", self.kind.tag()) } @@ -167,10 +182,358 @@ fn parse_broker_log_line(line: &str) -> (&str, &str, &str) { impl Drop for BrokerStub { fn drop(&mut self) { + if let Some(listener) = &self.sqs_listener { + listener.shutdown.store(true, Ordering::Relaxed); + let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port)); + } self.tempdir.take(); } } +#[derive(Debug)] +struct SqsListener { + port: u16, + shutdown: Arc, +} + +#[derive(Debug, Clone)] +struct SqsMessage { + message_id: String, + receipt_handle: String, + body: String, + receive_count: u32, +} + +#[derive(Debug, Default)] +struct SqsState { + next_id: u64, + queues: BTreeMap>, + inflight: BTreeMap, +} + +fn start_sqs_listener(log_path: PathBuf) -> std::io::Result> { + let listener = match TcpListener::bind("127.0.0.1:0") { + Ok(listener) => listener, + Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => return Ok(None), + Err(e) => return Err(e), + }; + let port = listener.local_addr()?.port(); + let shutdown = Arc::new(AtomicBool::new(false)); + let state = Arc::new(Mutex::new(SqsState::default())); + let shutdown_clone = Arc::clone(&shutdown); + let state_clone = Arc::clone(&state); + std::thread::spawn(move || sqs_accept_loop(listener, shutdown_clone, state_clone, log_path)); + Ok(Some(SqsListener { port, shutdown })) +} + +fn sqs_accept_loop( + listener: TcpListener, + shutdown: Arc, + state: Arc>, + log_path: PathBuf, +) { + for stream in listener.incoming() { + if shutdown.load(Ordering::Relaxed) { + break; + } + let Ok(stream) = stream else { continue }; + let _ = stream.set_read_timeout(Some(Duration::from_secs(2))); + let _ = stream.set_write_timeout(Some(Duration::from_secs(2))); + let state = Arc::clone(&state); + let log_path = log_path.clone(); + std::thread::spawn(move || handle_sqs_connection(stream, state, &log_path)); + } +} + +fn handle_sqs_connection(mut stream: TcpStream, state: Arc>, log_path: &Path) { + let Some(req) = read_http_request(&stream) else { + return; + }; + let response = match handle_sqs_request(&req, state, log_path) { + Ok(body) => http_response(200, "OK", &body), + Err(body) => http_response(400, "Bad Request", &body), + }; + let _ = stream.write_all(response.as_bytes()); +} + +#[derive(Debug)] +struct HttpRequest { + path: String, + query: String, + body: String, +} + +fn read_http_request(stream: &TcpStream) -> Option { + let mut reader = BufReader::new(stream.try_clone().ok()?); + let mut request_line = String::new(); + if reader.read_line(&mut request_line).ok()? == 0 { + return None; + } + let mut parts = request_line.split_whitespace(); + let _method = parts.next()?; + let target = parts.next()?.to_owned(); + let (path, query) = split_target(&target); + + let mut content_length = 0_usize; + loop { + let mut line = String::new(); + if reader.read_line(&mut line).ok()? == 0 { + break; + } + let trimmed = line.trim_end_matches(['\r', '\n']); + if trimmed.is_empty() { + break; + } + if let Some((name, value)) = trimmed.split_once(':') + && name.eq_ignore_ascii_case("content-length") + { + content_length = value.trim().parse().unwrap_or(0); + } + } + + let mut body = vec![0u8; content_length.min(128 * 1024)]; + if !body.is_empty() { + reader.read_exact(&mut body).ok()?; + } + Some(HttpRequest { + path, + query, + body: String::from_utf8_lossy(&body).into_owned(), + }) +} + +fn split_target(target: &str) -> (String, String) { + let (path, query) = target.split_once('?').unwrap_or((target, "")); + (path.to_owned(), query.to_owned()) +} + +fn handle_sqs_request( + req: &HttpRequest, + state: Arc>, + log_path: &Path, +) -> Result { + let mut params = parse_form(&req.query); + params.extend(parse_form(&req.body)); + let action = params + .get("Action") + .or_else(|| params.get("X-Amz-Target")) + .map(|s| s.rsplit('.').next().unwrap_or(s).to_owned()) + .unwrap_or_default(); + match action.as_str() { + "SendMessage" => { + let queue = queue_name(¶ms, &req.path); + let body = params.get("MessageBody").cloned().unwrap_or_default(); + let mut guard = state.lock().map_err(|_| sqs_error("InternalError"))?; + guard.next_id += 1; + let message = SqsMessage { + message_id: format!("nyx-{:08}", guard.next_id), + receipt_handle: format!("rh-nyx-{:08}", guard.next_id), + body: body.clone(), + receive_count: 0, + }; + guard + .queues + .entry(queue.clone()) + .or_default() + .push_back(message.clone()); + let _ = append_broker_event(log_path, "publish", &queue, &body); + Ok(format!( + concat!( + "", + "{md5}", + "{message_id}", + "", + "nyx-sqs-request", + "" + ), + md5 = "00000000000000000000000000000000", + message_id = xml_escape(&message.message_id) + )) + } + "ReceiveMessage" => { + let queue = queue_name(¶ms, &req.path); + let max_messages = params + .get("MaxNumberOfMessages") + .and_then(|v| v.parse::().ok()) + .unwrap_or(1) + .clamp(1, 10); + let mut guard = state.lock().map_err(|_| sqs_error("InternalError"))?; + let mut messages = Vec::new(); + for _ in 0..max_messages { + let Some(mut message) = guard.queues.entry(queue.clone()).or_default().pop_front() + else { + break; + }; + message.receive_count += 1; + let _ = append_broker_event(log_path, "deliver", &queue, &message.body); + guard.inflight.insert( + message.receipt_handle.clone(), + (queue.clone(), message.clone()), + ); + messages.push(message); + } + let mut body = String::from(""); + for message in messages { + body.push_str(""); + body.push_str(&format!( + "{}", + xml_escape(&message.message_id) + )); + body.push_str(&format!( + "{}", + xml_escape(&message.receipt_handle) + )); + body.push_str(&format!("{}", xml_escape(&message.body))); + body.push_str("ApproximateReceiveCount"); + body.push_str(&message.receive_count.to_string()); + body.push_str(""); + body.push_str(""); + } + body.push_str( + "nyx-sqs-request", + ); + Ok(body) + } + "DeleteMessage" => { + let queue = queue_name(¶ms, &req.path); + let receipt = params.get("ReceiptHandle").cloned().unwrap_or_default(); + if let Ok(mut guard) = state.lock() + && guard.inflight.remove(&receipt).is_some() + { + let _ = append_broker_event(log_path, "ack", &queue, &receipt); + } + Ok(String::from( + "nyx-sqs-request", + )) + } + "GetQueueUrl" => { + let queue = params + .get("QueueName") + .cloned() + .unwrap_or_else(|| queue_name(¶ms, &req.path)); + Ok(format!( + concat!( + "", + "http://127.0.0.1/{queue}", + "", + "nyx-sqs-request", + "" + ), + queue = xml_escape(&queue) + )) + } + _ => Err(sqs_error("InvalidAction")), + } +} + +fn http_response(status: u16, reason: &str, body: &str) -> String { + format!( + "HTTP/1.1 {status} {reason}\r\ncontent-type: text/xml\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{body}", + body.len() + ) +} + +fn sqs_error(code: &str) -> String { + format!( + "Sender{}{}nyx-sqs-request", + xml_escape(code), + xml_escape(code) + ) +} + +fn parse_form(input: &str) -> BTreeMap { + let mut out = BTreeMap::new(); + for pair in input.split('&') { + if pair.is_empty() { + continue; + } + let (key, value) = pair.split_once('=').unwrap_or((pair, "")); + out.insert(percent_decode(key), percent_decode(value)); + } + out +} + +fn percent_decode(input: &str) -> String { + let mut out = Vec::with_capacity(input.len()); + let bytes = input.as_bytes(); + let mut idx = 0; + while idx < bytes.len() { + match bytes[idx] { + b'+' => { + out.push(b' '); + idx += 1; + } + b'%' if idx + 2 < bytes.len() => { + let hi = hex_val(bytes[idx + 1]); + let lo = hex_val(bytes[idx + 2]); + if let (Some(hi), Some(lo)) = (hi, lo) { + out.push((hi << 4) | lo); + idx += 3; + } else { + out.push(bytes[idx]); + idx += 1; + } + } + b => { + out.push(b); + idx += 1; + } + } + } + String::from_utf8_lossy(&out).into_owned() +} + +fn hex_val(b: u8) -> Option { + match b { + b'0'..=b'9' => Some(b - b'0'), + b'a'..=b'f' => Some(b - b'a' + 10), + b'A'..=b'F' => Some(b - b'A' + 10), + _ => None, + } +} + +fn queue_name(params: &BTreeMap, path: &str) -> String { + if let Some(url) = params.get("QueueUrl") + && let Some(queue) = url.trim_end_matches('/').rsplit('/').next() + && !queue.is_empty() + { + return queue.to_owned(); + } + let path_queue = path.trim_matches('/'); + if !path_queue.is_empty() { + return path_queue.to_owned(); + } + "default".to_owned() +} + +fn xml_escape(input: &str) -> String { + input + .replace('&', "&") + .replace('<', "<") + .replace('>', ">") + .replace('"', """) + .replace('\'', "'") +} + +fn append_broker_event( + log_path: &Path, + action: &str, + destination: &str, + payload: &str, +) -> std::io::Result<()> { + let mut f = OpenOptions::new() + .append(true) + .create(true) + .open(log_path)?; + writeln!( + f, + "{}\t{}\t{}", + action.replace('\t', " "), + destination.replace('\t', " "), + payload + ) +} + #[cfg(test)] mod tests { use super::*; @@ -203,6 +566,72 @@ mod tests { assert!(stub.drain_events().is_empty(), "drain cursor must advance"); } + #[test] + fn sqs_broker_exposes_http_query_emulator() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Sqs, dir.path()).unwrap(); + let endpoint = stub.endpoint(); + if endpoint == "loopback://sqs" { + return; + } + assert!( + endpoint.starts_with("http://127.0.0.1:"), + "SQS endpoint should be a real SDK-compatible HTTP endpoint, got {endpoint}" + ); + } + + #[test] + fn sqs_query_emulator_records_publish_deliver_ack() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Sqs, dir.path()).unwrap(); + let endpoint = stub.endpoint(); + if endpoint == "loopback://sqs" { + return; + } + let port: u16 = endpoint + .trim_start_matches("http://127.0.0.1:") + .parse() + .unwrap(); + let queue_url = format!("http://127.0.0.1:{port}/jobs"); + let send_body = format!( + "Action=SendMessage&QueueUrl={}&MessageBody=NYX%09PAYLOAD", + form_escape(&queue_url) + ); + let send = http_post(port, "/", &send_body); + assert!(send.contains(""), "{send}"); + + let receive_body = format!( + "Action=ReceiveMessage&QueueUrl={}&MaxNumberOfMessages=1", + form_escape(&queue_url) + ); + let receive = http_post(port, "/", &receive_body); + assert!(receive.contains("NYX\tPAYLOAD"), "{receive}"); + let receipt = receive + .split("") + .nth(1) + .and_then(|s| s.split("").next()) + .unwrap() + .to_owned(); + + let delete_body = format!( + "Action=DeleteMessage&QueueUrl={}&ReceiptHandle={}", + form_escape(&queue_url), + form_escape(&receipt) + ); + let delete = http_post(port, "/", &delete_body); + assert!(delete.contains(""), "{delete}"); + + let events = stub.drain_events(); + let actions: Vec<&str> = events + .iter() + .map(|ev| ev.detail.get("action").unwrap().as_str()) + .collect(); + assert_eq!(actions, vec!["publish", "deliver", "ack"]); + assert_eq!(events[0].detail.get("destination").unwrap(), "jobs"); + assert_eq!(events[1].detail.get("payload").unwrap(), "NYX\tPAYLOAD"); + assert_eq!(events[2].detail.get("payload").unwrap(), &receipt); + } + #[test] fn broker_drain_understands_delivery_and_ack_events() { let dir = TempDir::new().unwrap(); @@ -227,4 +656,30 @@ mod tests { assert_eq!(events[0].detail.get("action").unwrap(), "publish"); assert_eq!(events[0].detail.get("payload").unwrap(), "legacy payload"); } + + fn http_post(port: u16, path: &str, body: &str) -> String { + let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap(); + let req = format!( + "POST {path} HTTP/1.1\r\nhost: 127.0.0.1:{port}\r\ncontent-type: application/x-www-form-urlencoded\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{body}", + body.len() + ); + s.write_all(req.as_bytes()).unwrap(); + let mut out = String::new(); + s.read_to_string(&mut out).unwrap(); + out + } + + fn form_escape(input: &str) -> String { + let mut out = String::new(); + for b in input.bytes() { + match b { + b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => { + out.push(b as char) + } + b' ' => out.push('+'), + b => out.push_str(&format!("%{b:02X}")), + } + } + out + } } diff --git a/tests/message_handler_corpus.rs b/tests/message_handler_corpus.rs index de483140..49712fe8 100644 --- a/tests/message_handler_corpus.rs +++ b/tests/message_handler_corpus.rs @@ -208,6 +208,11 @@ fn message_handler_node_uses_sqs_loopback() { let spec = make_spec(Lang::JavaScript, "jobs", "handler", entry_file("sqs_node")); let h = lang::emit(&spec).expect("emit ok"); assert!(h.source.contains("NyxSqsLoopback")); + assert!(h.source.contains("_nyxTryRealSqs")); + assert!(h.source.contains("@aws-sdk/client-sqs")); + assert!(h.source.contains("SendMessageCommand")); + assert!(h.source.contains("ReceiveMessageCommand")); + assert!(h.source.contains("DeleteMessageCommand")); assert!(h.source.contains("receiveMessage")); assert!(h.source.contains("deleteMessage")); assert!(h.source.contains("'deliver'")); @@ -217,6 +222,46 @@ fn message_handler_node_uses_sqs_loopback() { assert!(h.source.contains("_nyxRecordBrokerPublish")); } +#[test] +fn message_handler_python_sqs_tries_real_boto3_client_first() { + let spec = make_spec_with_adapter( + Lang::Python, + "jobs", + "handler", + entry_file("sqs_python"), + "sqs-python", + ); + let h = lang::emit(&spec).expect("emit ok"); + assert!(h.source.contains("_nyx_try_real_sqs")); + assert!(h.source.contains("boto3.client(\"sqs\"")); + assert!(h.source.contains("send_message")); + assert!(h.source.contains("receive_message")); + assert!(h.source.contains("delete_message")); + assert!(h.source.contains("NyxSqsLoopback")); +} + +#[test] +fn message_handler_java_sqs_tries_real_aws_sdk_client_first() { + let spec = make_spec_with_adapter( + Lang::Java, + "jobs", + "onMessage", + entry_file("sqs_java"), + "sqs-java", + ); + let h = lang::emit(&spec).expect("emit ok"); + assert!(h.source.contains("nyxTryRealSqs")); + assert!( + h.source + .contains("software.amazon.awssdk.services.sqs.SqsClient") + ); + assert!(h.source.contains("SendMessageRequest")); + assert!(h.source.contains("ReceiveMessageRequest")); + assert!(h.source.contains("DeleteMessageRequest")); + assert!(h.command.iter().any(|arg| arg == ".:lib/*")); + assert!(h.source.contains("NyxSqsLoopback")); +} + #[test] fn message_handler_go_uses_nyx_handlers_registry() { let spec = make_spec(Lang::Go, "my-sub", "OnMessage", entry_file("pubsub_go"));