**refactor(dynamic): add HTTP emulators for Pubsub, Rabbit, and NATS with publish/deliver/ack logic, extend event recording, endpoint rewriting, and SDK compatibility across Java, Go, Python, and Rust**

This commit is contained in:
elipeter 2026-05-27 11:29:07 -05:00
parent 57d3677bd4
commit a55849f1ca
7 changed files with 729 additions and 65 deletions

View file

@ -32,6 +32,7 @@ pub struct BrokerStub {
cursor: Mutex<u64>,
kafka_listener: Option<KafkaListener>,
sqs_listener: Option<SqsListener>,
http_listener: Option<HttpBrokerListener>,
}
impl BrokerStub {
@ -53,6 +54,12 @@ impl BrokerStub {
} else {
None
};
let http_listener = if matches!(kind, StubKind::Pubsub | StubKind::Rabbit | StubKind::Nats)
{
start_http_broker_listener(kind, log_path.clone())?
} else {
None
};
Ok(Self {
kind,
tempdir: Some(tempdir),
@ -60,6 +67,7 @@ impl BrokerStub {
cursor: Mutex::new(0),
kafka_listener,
sqs_listener,
http_listener,
})
}
@ -120,6 +128,9 @@ impl StubProvider for BrokerStub {
if let Some(listener) = &self.sqs_listener {
return format!("http://127.0.0.1:{}", listener.port);
}
if let Some(listener) = &self.http_listener {
return format!("http://127.0.0.1:{}", listener.port);
}
format!("loopback://{}", self.kind.tag())
}
@ -200,6 +211,10 @@ impl Drop for BrokerStub {
listener.shutdown.store(true, Ordering::Relaxed);
let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port));
}
if let Some(listener) = &self.http_listener {
listener.shutdown.store(true, Ordering::Relaxed);
let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port));
}
self.tempdir.take();
}
}
@ -425,6 +440,7 @@ fn handle_sqs_connection(mut stream: TcpStream, state: Arc<Mutex<SqsState>>, log
#[derive(Debug)]
struct HttpRequest {
method: String,
path: String,
query: String,
body: String,
@ -437,7 +453,7 @@ fn read_http_request(stream: &TcpStream) -> Option<HttpRequest> {
return None;
}
let mut parts = request_line.split_whitespace();
let _method = parts.next()?;
let method = parts.next()?.to_owned();
let target = parts.next()?.to_owned();
let (path, query) = split_target(&target);
@ -463,12 +479,202 @@ fn read_http_request(stream: &TcpStream) -> Option<HttpRequest> {
reader.read_exact(&mut body).ok()?;
}
Some(HttpRequest {
method,
path,
query,
body: String::from_utf8_lossy(&body).into_owned(),
})
}
#[derive(Debug)]
struct HttpBrokerListener {
port: u16,
shutdown: Arc<AtomicBool>,
}
#[derive(Debug, Clone)]
struct HttpBrokerMessage {
id: String,
payload: String,
}
#[derive(Debug, Default)]
struct HttpBrokerState {
next_id: u64,
streams: BTreeMap<String, VecDeque<HttpBrokerMessage>>,
inflight: BTreeMap<String, (String, HttpBrokerMessage)>,
}
fn start_http_broker_listener(
kind: StubKind,
log_path: PathBuf,
) -> std::io::Result<Option<HttpBrokerListener>> {
let listener = match TcpListener::bind("127.0.0.1:0") {
Ok(listener) => listener,
Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => return Ok(None),
Err(e) => return Err(e),
};
let port = listener.local_addr()?.port();
let shutdown = Arc::new(AtomicBool::new(false));
let state = Arc::new(Mutex::new(HttpBrokerState::default()));
let shutdown_clone = Arc::clone(&shutdown);
let state_clone = Arc::clone(&state);
std::thread::spawn(move || {
http_broker_accept_loop(listener, shutdown_clone, kind, state_clone, log_path)
});
Ok(Some(HttpBrokerListener { port, shutdown }))
}
fn http_broker_accept_loop(
listener: TcpListener,
shutdown: Arc<AtomicBool>,
kind: StubKind,
state: Arc<Mutex<HttpBrokerState>>,
log_path: PathBuf,
) {
for stream in listener.incoming() {
if shutdown.load(Ordering::Relaxed) {
break;
}
let Ok(stream) = stream else { continue };
let _ = stream.set_read_timeout(Some(Duration::from_secs(2)));
let _ = stream.set_write_timeout(Some(Duration::from_secs(2)));
let state = Arc::clone(&state);
let log_path = log_path.clone();
std::thread::spawn(move || handle_http_broker_connection(stream, kind, state, &log_path));
}
}
fn handle_http_broker_connection(
mut stream: TcpStream,
kind: StubKind,
state: Arc<Mutex<HttpBrokerState>>,
log_path: &Path,
) {
let Some(req) = read_http_request(&stream) else {
return;
};
let response = match handle_http_broker_request(kind, &req, state, log_path) {
Ok(body) => http_response_with_type(200, "OK", "application/json", &body),
Err(body) => http_response_with_type(400, "Bad Request", "application/json", &body),
};
let _ = stream.write_all(response.as_bytes());
}
fn handle_http_broker_request(
kind: StubKind,
req: &HttpRequest,
state: Arc<Mutex<HttpBrokerState>>,
log_path: &Path,
) -> Result<String, String> {
let Some((destination, action)) = http_broker_path_parts(kind, &req.path) else {
return Err(json_error("invalid broker stub path"));
};
match action.as_str() {
"messages" if req.method.eq_ignore_ascii_case("GET") => {
let params = parse_form(&req.query);
let max_messages = params
.get("max")
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(1)
.clamp(1, 100);
let mut guard = state.lock().map_err(|_| json_error("internal error"))?;
let mut messages = Vec::new();
for _ in 0..max_messages {
let Some(message) = guard
.streams
.entry(destination.clone())
.or_default()
.pop_front()
else {
break;
};
let _ = append_broker_event(log_path, "deliver", &destination, &message.payload);
guard
.inflight
.insert(message.id.clone(), (destination.clone(), message.clone()));
messages.push(http_broker_message_json(kind, &destination, &message));
}
Ok(serde_json::json!({ "messages": messages }).to_string())
}
"messages" => {
let mut guard = state.lock().map_err(|_| json_error("internal error"))?;
guard.next_id += 1;
let id = format!("nyx-{:08}", guard.next_id);
let message = HttpBrokerMessage {
id: id.clone(),
payload: req.body.clone(),
};
guard
.streams
.entry(destination.clone())
.or_default()
.push_back(message);
let _ = append_broker_event(log_path, "publish", &destination, &req.body);
Ok(serde_json::json!({ "id": id }).to_string())
}
"ack" => {
let params = parse_form(&req.body);
let ack_id = params
.get("ack_id")
.or_else(|| params.get("id"))
.cloned()
.unwrap_or_default();
if let Ok(mut guard) = state.lock()
&& (ack_id.is_empty() || guard.inflight.remove(&ack_id).is_some())
{
let _ = append_broker_event(log_path, "ack", &destination, &ack_id);
}
Ok(serde_json::json!({ "acked": true }).to_string())
}
_ => Err(json_error("invalid broker stub action")),
}
}
fn http_broker_path_parts(kind: StubKind, path: &str) -> Option<(String, String)> {
let expected_root = match kind {
StubKind::Pubsub => "topics",
StubKind::Rabbit => "queues",
StubKind::Nats => "subjects",
_ => return None,
};
let mut parts = path.trim_matches('/').split('/');
if parts.next()? != expected_root {
return None;
}
let destination = parts.next().map(percent_decode)?;
let action = parts.next()?.to_owned();
if destination.is_empty() || parts.next().is_some() {
return None;
}
Some((destination, action))
}
fn http_broker_message_json(
kind: StubKind,
destination: &str,
message: &HttpBrokerMessage,
) -> serde_json::Value {
match kind {
StubKind::Pubsub => serde_json::json!({
"id": &message.id,
"ack_id": &message.id,
"data": &message.payload
}),
StubKind::Rabbit => serde_json::json!({
"delivery_tag": &message.id,
"body": &message.payload
}),
StubKind::Nats => serde_json::json!({
"subject": destination,
"ack_id": &message.id,
"data": &message.payload,
"reply": ""
}),
_ => serde_json::json!({}),
}
}
fn split_target(target: &str) -> (String, String) {
let (path, query) = target.split_once('?').unwrap_or((target, ""));
(path.to_owned(), query.to_owned())
@ -769,6 +975,22 @@ mod tests {
);
}
#[test]
fn remaining_brokers_expose_http_emulators() {
for kind in [StubKind::Pubsub, StubKind::Rabbit, StubKind::Nats] {
let dir = TempDir::new().unwrap();
let stub = BrokerStub::start(kind, dir.path()).unwrap();
let endpoint = stub.endpoint();
if endpoint == format!("loopback://{}", kind.tag()) {
continue;
}
assert!(
endpoint.starts_with("http://127.0.0.1:"),
"{kind:?} endpoint should be a host-side HTTP emulator, got {endpoint}"
);
}
}
#[test]
fn kafka_http_emulator_records_publish_deliver_ack() {
let dir = TempDir::new().unwrap();
@ -853,6 +1075,69 @@ mod tests {
assert_eq!(events[2].detail.get("payload").unwrap(), &receipt);
}
#[test]
fn remaining_http_broker_emulators_record_publish_deliver_ack() {
let cases = [
(StubKind::Pubsub, "topics", "projects/p/topics/orders"),
(StubKind::Rabbit, "queues", "work"),
(StubKind::Nats, "subjects", "events"),
];
for (kind, root, destination) in cases {
let dir = TempDir::new().unwrap();
let stub = BrokerStub::start(kind, dir.path()).unwrap();
let endpoint = stub.endpoint();
if endpoint == format!("loopback://{}", kind.tag()) {
continue;
}
let port: u16 = endpoint
.trim_start_matches("http://127.0.0.1:")
.parse()
.unwrap();
let escaped_destination = form_escape(destination);
let send = http_post(
port,
&format!("/{root}/{escaped_destination}/messages"),
"NYX\tPAYLOAD",
);
assert!(send.contains(r#""id":"nyx-00000001""#), "{send}");
let receive = http_get(
port,
&format!("/{root}/{escaped_destination}/messages?max=1"),
);
let parsed: serde_json::Value = serde_json::from_str(response_body(&receive)).unwrap();
let message = parsed["messages"][0].as_object().unwrap();
let payload = message
.get("data")
.or_else(|| message.get("body"))
.and_then(|v| v.as_str())
.unwrap();
assert_eq!(payload, "NYX\tPAYLOAD");
let ack_id = message
.get("ack_id")
.or_else(|| message.get("delivery_tag"))
.and_then(|v| v.as_str())
.unwrap();
let ack = http_post(
port,
&format!("/{root}/{escaped_destination}/ack"),
&format!("ack_id={}", form_escape(ack_id)),
);
assert!(ack.contains(r#""acked":true"#), "{ack}");
let events = stub.drain_events();
let actions: Vec<&str> = events
.iter()
.map(|ev| ev.detail.get("action").unwrap().as_str())
.collect();
assert_eq!(actions, vec!["publish", "deliver", "ack"], "{kind:?}");
assert_eq!(events[0].detail.get("destination").unwrap(), destination);
assert_eq!(events[1].detail.get("payload").unwrap(), "NYX\tPAYLOAD");
assert_eq!(events[2].detail.get("payload").unwrap(), ack_id);
}
}
#[test]
fn broker_drain_understands_delivery_and_ack_events() {
let dir = TempDir::new().unwrap();
@ -900,6 +1185,10 @@ mod tests {
out
}
fn response_body(response: &str) -> &str {
response.split("\r\n\r\n").nth(1).unwrap_or("")
}
fn form_escape(input: &str) -> String {
let mut out = String::new();
for b in input.bytes() {