refactor(dynamic): add SQS loopback HTTP emulator with real SDK compatibility, extend stub event recording and endpoint rewriting logic across Java and Python

This commit is contained in:
elipeter 2026-05-27 09:34:02 -05:00
parent 0903231189
commit 433036aead
6 changed files with 765 additions and 39 deletions

View file

@ -10,10 +10,15 @@
//! can use.
use super::{StubEvent, StubKind, StubProvider, monotonic_ns};
use std::collections::{BTreeMap, VecDeque};
use std::fs::OpenOptions;
use std::io::{BufRead, BufReader, Write};
use std::io::{BufRead, BufReader, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tempfile::TempDir;
/// Broker-cap stub. Endpoint is a stable loopback URI; the companion
@ -25,6 +30,7 @@ pub struct BrokerStub {
tempdir: Option<TempDir>,
log_path: PathBuf,
cursor: Mutex<u64>,
sqs_listener: Option<SqsListener>,
}
impl BrokerStub {
@ -36,11 +42,17 @@ impl BrokerStub {
.path()
.join(format!("nyx_{}_stub.events.log", kind.tag()));
std::fs::File::create(&log_path)?;
let sqs_listener = if kind == StubKind::Sqs {
start_sqs_listener(log_path.clone())?
} else {
None
};
Ok(Self {
kind,
tempdir: Some(tempdir),
log_path,
cursor: Mutex::new(0),
sqs_listener,
})
}
@ -95,6 +107,9 @@ impl StubProvider for BrokerStub {
}
fn endpoint(&self) -> String {
if let Some(listener) = &self.sqs_listener {
return format!("http://127.0.0.1:{}", listener.port);
}
format!("loopback://{}", self.kind.tag())
}
@ -167,10 +182,358 @@ fn parse_broker_log_line(line: &str) -> (&str, &str, &str) {
impl Drop for BrokerStub {
fn drop(&mut self) {
if let Some(listener) = &self.sqs_listener {
listener.shutdown.store(true, Ordering::Relaxed);
let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port));
}
self.tempdir.take();
}
}
#[derive(Debug)]
struct SqsListener {
port: u16,
shutdown: Arc<AtomicBool>,
}
#[derive(Debug, Clone)]
struct SqsMessage {
message_id: String,
receipt_handle: String,
body: String,
receive_count: u32,
}
#[derive(Debug, Default)]
struct SqsState {
next_id: u64,
queues: BTreeMap<String, VecDeque<SqsMessage>>,
inflight: BTreeMap<String, (String, SqsMessage)>,
}
fn start_sqs_listener(log_path: PathBuf) -> std::io::Result<Option<SqsListener>> {
let listener = match TcpListener::bind("127.0.0.1:0") {
Ok(listener) => listener,
Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => return Ok(None),
Err(e) => return Err(e),
};
let port = listener.local_addr()?.port();
let shutdown = Arc::new(AtomicBool::new(false));
let state = Arc::new(Mutex::new(SqsState::default()));
let shutdown_clone = Arc::clone(&shutdown);
let state_clone = Arc::clone(&state);
std::thread::spawn(move || sqs_accept_loop(listener, shutdown_clone, state_clone, log_path));
Ok(Some(SqsListener { port, shutdown }))
}
fn sqs_accept_loop(
listener: TcpListener,
shutdown: Arc<AtomicBool>,
state: Arc<Mutex<SqsState>>,
log_path: PathBuf,
) {
for stream in listener.incoming() {
if shutdown.load(Ordering::Relaxed) {
break;
}
let Ok(stream) = stream else { continue };
let _ = stream.set_read_timeout(Some(Duration::from_secs(2)));
let _ = stream.set_write_timeout(Some(Duration::from_secs(2)));
let state = Arc::clone(&state);
let log_path = log_path.clone();
std::thread::spawn(move || handle_sqs_connection(stream, state, &log_path));
}
}
fn handle_sqs_connection(mut stream: TcpStream, state: Arc<Mutex<SqsState>>, log_path: &Path) {
let Some(req) = read_http_request(&stream) else {
return;
};
let response = match handle_sqs_request(&req, state, log_path) {
Ok(body) => http_response(200, "OK", &body),
Err(body) => http_response(400, "Bad Request", &body),
};
let _ = stream.write_all(response.as_bytes());
}
#[derive(Debug)]
struct HttpRequest {
path: String,
query: String,
body: String,
}
fn read_http_request(stream: &TcpStream) -> Option<HttpRequest> {
let mut reader = BufReader::new(stream.try_clone().ok()?);
let mut request_line = String::new();
if reader.read_line(&mut request_line).ok()? == 0 {
return None;
}
let mut parts = request_line.split_whitespace();
let _method = parts.next()?;
let target = parts.next()?.to_owned();
let (path, query) = split_target(&target);
let mut content_length = 0_usize;
loop {
let mut line = String::new();
if reader.read_line(&mut line).ok()? == 0 {
break;
}
let trimmed = line.trim_end_matches(['\r', '\n']);
if trimmed.is_empty() {
break;
}
if let Some((name, value)) = trimmed.split_once(':')
&& name.eq_ignore_ascii_case("content-length")
{
content_length = value.trim().parse().unwrap_or(0);
}
}
let mut body = vec![0u8; content_length.min(128 * 1024)];
if !body.is_empty() {
reader.read_exact(&mut body).ok()?;
}
Some(HttpRequest {
path,
query,
body: String::from_utf8_lossy(&body).into_owned(),
})
}
fn split_target(target: &str) -> (String, String) {
let (path, query) = target.split_once('?').unwrap_or((target, ""));
(path.to_owned(), query.to_owned())
}
fn handle_sqs_request(
req: &HttpRequest,
state: Arc<Mutex<SqsState>>,
log_path: &Path,
) -> Result<String, String> {
let mut params = parse_form(&req.query);
params.extend(parse_form(&req.body));
let action = params
.get("Action")
.or_else(|| params.get("X-Amz-Target"))
.map(|s| s.rsplit('.').next().unwrap_or(s).to_owned())
.unwrap_or_default();
match action.as_str() {
"SendMessage" => {
let queue = queue_name(&params, &req.path);
let body = params.get("MessageBody").cloned().unwrap_or_default();
let mut guard = state.lock().map_err(|_| sqs_error("InternalError"))?;
guard.next_id += 1;
let message = SqsMessage {
message_id: format!("nyx-{:08}", guard.next_id),
receipt_handle: format!("rh-nyx-{:08}", guard.next_id),
body: body.clone(),
receive_count: 0,
};
guard
.queues
.entry(queue.clone())
.or_default()
.push_back(message.clone());
let _ = append_broker_event(log_path, "publish", &queue, &body);
Ok(format!(
concat!(
"<SendMessageResponse><SendMessageResult>",
"<MD5OfMessageBody>{md5}</MD5OfMessageBody>",
"<MessageId>{message_id}</MessageId>",
"</SendMessageResult><ResponseMetadata>",
"<RequestId>nyx-sqs-request</RequestId>",
"</ResponseMetadata></SendMessageResponse>"
),
md5 = "00000000000000000000000000000000",
message_id = xml_escape(&message.message_id)
))
}
"ReceiveMessage" => {
let queue = queue_name(&params, &req.path);
let max_messages = params
.get("MaxNumberOfMessages")
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(1)
.clamp(1, 10);
let mut guard = state.lock().map_err(|_| sqs_error("InternalError"))?;
let mut messages = Vec::new();
for _ in 0..max_messages {
let Some(mut message) = guard.queues.entry(queue.clone()).or_default().pop_front()
else {
break;
};
message.receive_count += 1;
let _ = append_broker_event(log_path, "deliver", &queue, &message.body);
guard.inflight.insert(
message.receipt_handle.clone(),
(queue.clone(), message.clone()),
);
messages.push(message);
}
let mut body = String::from("<ReceiveMessageResponse><ReceiveMessageResult>");
for message in messages {
body.push_str("<Message>");
body.push_str(&format!(
"<MessageId>{}</MessageId>",
xml_escape(&message.message_id)
));
body.push_str(&format!(
"<ReceiptHandle>{}</ReceiptHandle>",
xml_escape(&message.receipt_handle)
));
body.push_str(&format!("<Body>{}</Body>", xml_escape(&message.body)));
body.push_str("<Attribute><Name>ApproximateReceiveCount</Name><Value>");
body.push_str(&message.receive_count.to_string());
body.push_str("</Value></Attribute>");
body.push_str("</Message>");
}
body.push_str(
"</ReceiveMessageResult><ResponseMetadata><RequestId>nyx-sqs-request</RequestId></ResponseMetadata></ReceiveMessageResponse>",
);
Ok(body)
}
"DeleteMessage" => {
let queue = queue_name(&params, &req.path);
let receipt = params.get("ReceiptHandle").cloned().unwrap_or_default();
if let Ok(mut guard) = state.lock()
&& guard.inflight.remove(&receipt).is_some()
{
let _ = append_broker_event(log_path, "ack", &queue, &receipt);
}
Ok(String::from(
"<DeleteMessageResponse><ResponseMetadata><RequestId>nyx-sqs-request</RequestId></ResponseMetadata></DeleteMessageResponse>",
))
}
"GetQueueUrl" => {
let queue = params
.get("QueueName")
.cloned()
.unwrap_or_else(|| queue_name(&params, &req.path));
Ok(format!(
concat!(
"<GetQueueUrlResponse><GetQueueUrlResult>",
"<QueueUrl>http://127.0.0.1/{queue}</QueueUrl>",
"</GetQueueUrlResult><ResponseMetadata>",
"<RequestId>nyx-sqs-request</RequestId>",
"</ResponseMetadata></GetQueueUrlResponse>"
),
queue = xml_escape(&queue)
))
}
_ => Err(sqs_error("InvalidAction")),
}
}
fn http_response(status: u16, reason: &str, body: &str) -> String {
format!(
"HTTP/1.1 {status} {reason}\r\ncontent-type: text/xml\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{body}",
body.len()
)
}
fn sqs_error(code: &str) -> String {
format!(
"<ErrorResponse><Error><Type>Sender</Type><Code>{}</Code><Message>{}</Message></Error><RequestId>nyx-sqs-request</RequestId></ErrorResponse>",
xml_escape(code),
xml_escape(code)
)
}
fn parse_form(input: &str) -> BTreeMap<String, String> {
let mut out = BTreeMap::new();
for pair in input.split('&') {
if pair.is_empty() {
continue;
}
let (key, value) = pair.split_once('=').unwrap_or((pair, ""));
out.insert(percent_decode(key), percent_decode(value));
}
out
}
fn percent_decode(input: &str) -> String {
let mut out = Vec::with_capacity(input.len());
let bytes = input.as_bytes();
let mut idx = 0;
while idx < bytes.len() {
match bytes[idx] {
b'+' => {
out.push(b' ');
idx += 1;
}
b'%' if idx + 2 < bytes.len() => {
let hi = hex_val(bytes[idx + 1]);
let lo = hex_val(bytes[idx + 2]);
if let (Some(hi), Some(lo)) = (hi, lo) {
out.push((hi << 4) | lo);
idx += 3;
} else {
out.push(bytes[idx]);
idx += 1;
}
}
b => {
out.push(b);
idx += 1;
}
}
}
String::from_utf8_lossy(&out).into_owned()
}
fn hex_val(b: u8) -> Option<u8> {
match b {
b'0'..=b'9' => Some(b - b'0'),
b'a'..=b'f' => Some(b - b'a' + 10),
b'A'..=b'F' => Some(b - b'A' + 10),
_ => None,
}
}
fn queue_name(params: &BTreeMap<String, String>, path: &str) -> String {
if let Some(url) = params.get("QueueUrl")
&& let Some(queue) = url.trim_end_matches('/').rsplit('/').next()
&& !queue.is_empty()
{
return queue.to_owned();
}
let path_queue = path.trim_matches('/');
if !path_queue.is_empty() {
return path_queue.to_owned();
}
"default".to_owned()
}
fn xml_escape(input: &str) -> String {
input
.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
.replace('"', "&quot;")
.replace('\'', "&apos;")
}
fn append_broker_event(
log_path: &Path,
action: &str,
destination: &str,
payload: &str,
) -> std::io::Result<()> {
let mut f = OpenOptions::new()
.append(true)
.create(true)
.open(log_path)?;
writeln!(
f,
"{}\t{}\t{}",
action.replace('\t', " "),
destination.replace('\t', " "),
payload
)
}
#[cfg(test)]
mod tests {
use super::*;
@ -203,6 +566,72 @@ mod tests {
assert!(stub.drain_events().is_empty(), "drain cursor must advance");
}
#[test]
fn sqs_broker_exposes_http_query_emulator() {
let dir = TempDir::new().unwrap();
let stub = BrokerStub::start(StubKind::Sqs, dir.path()).unwrap();
let endpoint = stub.endpoint();
if endpoint == "loopback://sqs" {
return;
}
assert!(
endpoint.starts_with("http://127.0.0.1:"),
"SQS endpoint should be a real SDK-compatible HTTP endpoint, got {endpoint}"
);
}
#[test]
fn sqs_query_emulator_records_publish_deliver_ack() {
let dir = TempDir::new().unwrap();
let stub = BrokerStub::start(StubKind::Sqs, dir.path()).unwrap();
let endpoint = stub.endpoint();
if endpoint == "loopback://sqs" {
return;
}
let port: u16 = endpoint
.trim_start_matches("http://127.0.0.1:")
.parse()
.unwrap();
let queue_url = format!("http://127.0.0.1:{port}/jobs");
let send_body = format!(
"Action=SendMessage&QueueUrl={}&MessageBody=NYX%09PAYLOAD",
form_escape(&queue_url)
);
let send = http_post(port, "/", &send_body);
assert!(send.contains("<SendMessageResponse>"), "{send}");
let receive_body = format!(
"Action=ReceiveMessage&QueueUrl={}&MaxNumberOfMessages=1",
form_escape(&queue_url)
);
let receive = http_post(port, "/", &receive_body);
assert!(receive.contains("<Body>NYX\tPAYLOAD</Body>"), "{receive}");
let receipt = receive
.split("<ReceiptHandle>")
.nth(1)
.and_then(|s| s.split("</ReceiptHandle>").next())
.unwrap()
.to_owned();
let delete_body = format!(
"Action=DeleteMessage&QueueUrl={}&ReceiptHandle={}",
form_escape(&queue_url),
form_escape(&receipt)
);
let delete = http_post(port, "/", &delete_body);
assert!(delete.contains("<DeleteMessageResponse>"), "{delete}");
let events = stub.drain_events();
let actions: Vec<&str> = events
.iter()
.map(|ev| ev.detail.get("action").unwrap().as_str())
.collect();
assert_eq!(actions, vec!["publish", "deliver", "ack"]);
assert_eq!(events[0].detail.get("destination").unwrap(), "jobs");
assert_eq!(events[1].detail.get("payload").unwrap(), "NYX\tPAYLOAD");
assert_eq!(events[2].detail.get("payload").unwrap(), &receipt);
}
#[test]
fn broker_drain_understands_delivery_and_ack_events() {
let dir = TempDir::new().unwrap();
@ -227,4 +656,30 @@ mod tests {
assert_eq!(events[0].detail.get("action").unwrap(), "publish");
assert_eq!(events[0].detail.get("payload").unwrap(), "legacy payload");
}
fn http_post(port: u16, path: &str, body: &str) -> String {
let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
let req = format!(
"POST {path} HTTP/1.1\r\nhost: 127.0.0.1:{port}\r\ncontent-type: application/x-www-form-urlencoded\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{body}",
body.len()
);
s.write_all(req.as_bytes()).unwrap();
let mut out = String::new();
s.read_to_string(&mut out).unwrap();
out
}
fn form_escape(input: &str) -> String {
let mut out = String::new();
for b in input.bytes() {
match b {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
out.push(b as char)
}
b' ' => out.push('+'),
b => out.push_str(&format!("%{b:02X}")),
}
}
out
}
}