**refactor(dynamic): add AMQP protocol emulator for Rabbit with publish/deliver/ack support, enhance endpoint handling, and extend test coverage**

This commit is contained in:
elipeter 2026-05-27 13:10:58 -05:00
parent d5c51c5d8a
commit fd5e1f3e89
2 changed files with 912 additions and 9 deletions

View file

@ -915,6 +915,11 @@ fn rewrite_extra_env_for_container(
{
return (k.clone(), format!("nats://host-gateway:{rest}"));
}
if k == "NYX_RABBIT_ENDPOINT"
&& let Some(rest) = v.strip_prefix("amqp://127.0.0.1:")
{
return (k.clone(), format!("amqp://host-gateway:{rest}"));
}
(k.clone(), v.clone())
})
.collect()
@ -2302,7 +2307,7 @@ mod tests {
),
(
"NYX_RABBIT_ENDPOINT".to_owned(),
"http://127.0.0.1:45678/queues".to_owned(),
"amqp://127.0.0.1:45678/%2f".to_owned(),
),
(
"NYX_NATS_ENDPOINT".to_owned(),
@ -2331,7 +2336,7 @@ mod tests {
),
(
"NYX_RABBIT_ENDPOINT".to_owned(),
"http://host-gateway:45678/queues".to_owned(),
"amqp://host-gateway:45678/%2f".to_owned(),
),
(
"NYX_NATS_ENDPOINT".to_owned(),

View file

@ -33,6 +33,7 @@ pub struct BrokerStub {
kafka_listener: Option<KafkaListener>,
sqs_listener: Option<SqsListener>,
http_listener: Option<HttpBrokerListener>,
rabbit_amqp_listener: Option<RabbitAmqpListener>,
nats_listener: Option<NatsListener>,
}
@ -60,6 +61,11 @@ impl BrokerStub {
} else {
None
};
let rabbit_amqp_listener = if kind == StubKind::Rabbit {
start_rabbit_amqp_listener(log_path.clone())?
} else {
None
};
let nats_listener = if kind == StubKind::Nats {
start_nats_listener(log_path.clone())?
} else {
@ -73,6 +79,7 @@ impl BrokerStub {
kafka_listener,
sqs_listener,
http_listener,
rabbit_amqp_listener,
nats_listener,
})
}
@ -134,6 +141,9 @@ impl StubProvider for BrokerStub {
if let Some(listener) = &self.sqs_listener {
return format!("http://127.0.0.1:{}", listener.port);
}
if let Some(listener) = &self.rabbit_amqp_listener {
return format!("amqp://127.0.0.1:{}/%2f", listener.port);
}
if let Some(listener) = &self.http_listener {
return format!("http://127.0.0.1:{}", listener.port);
}
@ -224,6 +234,10 @@ impl Drop for BrokerStub {
listener.shutdown.store(true, Ordering::Relaxed);
let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port));
}
if let Some(listener) = &self.rabbit_amqp_listener {
listener.shutdown.store(true, Ordering::Relaxed);
let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port));
}
if let Some(listener) = &self.nats_listener {
listener.shutdown.store(true, Ordering::Relaxed);
let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port));
@ -688,6 +702,668 @@ fn http_broker_message_json(
}
}
#[derive(Debug)]
struct RabbitAmqpListener {
port: u16,
shutdown: Arc<AtomicBool>,
}
#[derive(Debug, Default)]
struct RabbitAmqpState {
next_delivery_tag: u64,
next_consumer_tag: u64,
queues: BTreeMap<String, VecDeque<String>>,
inflight: BTreeMap<u64, (String, String)>,
consumers: BTreeMap<String, Vec<RabbitAmqpConsumer>>,
}
#[derive(Debug, Clone)]
struct RabbitAmqpConsumer {
consumer_tag: String,
channel: u16,
no_ack: bool,
writer: Arc<Mutex<TcpStream>>,
}
#[derive(Debug)]
struct AmqpFrame {
frame_type: u8,
channel: u16,
payload: Vec<u8>,
}
const AMQP_FRAME_METHOD: u8 = 1;
const AMQP_FRAME_HEADER: u8 = 2;
const AMQP_FRAME_BODY: u8 = 3;
const AMQP_FRAME_HEARTBEAT: u8 = 8;
const AMQP_FRAME_END: u8 = 0xce;
fn start_rabbit_amqp_listener(log_path: PathBuf) -> std::io::Result<Option<RabbitAmqpListener>> {
let listener = match TcpListener::bind("127.0.0.1:0") {
Ok(listener) => listener,
Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => return Ok(None),
Err(e) => return Err(e),
};
let port = listener.local_addr()?.port();
let shutdown = Arc::new(AtomicBool::new(false));
let state = Arc::new(Mutex::new(RabbitAmqpState::default()));
let shutdown_clone = Arc::clone(&shutdown);
let state_clone = Arc::clone(&state);
std::thread::spawn(move || {
rabbit_amqp_accept_loop(listener, shutdown_clone, state_clone, log_path)
});
Ok(Some(RabbitAmqpListener { port, shutdown }))
}
fn rabbit_amqp_accept_loop(
listener: TcpListener,
shutdown: Arc<AtomicBool>,
state: Arc<Mutex<RabbitAmqpState>>,
log_path: PathBuf,
) {
for stream in listener.incoming() {
if shutdown.load(Ordering::Relaxed) {
break;
}
let Ok(stream) = stream else { continue };
let _ = stream.set_read_timeout(Some(Duration::from_secs(5)));
let _ = stream.set_write_timeout(Some(Duration::from_secs(5)));
let state = Arc::clone(&state);
let log_path = log_path.clone();
std::thread::spawn(move || handle_rabbit_amqp_connection(stream, state, &log_path));
}
}
fn handle_rabbit_amqp_connection(
stream: TcpStream,
state: Arc<Mutex<RabbitAmqpState>>,
log_path: &Path,
) {
let Ok(mut writer) = stream.try_clone() else {
return;
};
let consumer_writer = match stream.try_clone() {
Ok(stream) => Arc::new(Mutex::new(stream)),
Err(_) => return,
};
let mut reader = BufReader::new(stream);
let mut protocol = [0_u8; 8];
if reader.read_exact(&mut protocol).is_err() || &protocol != b"AMQP\0\0\x09\x01" {
return;
}
if amqp_write_connection_start(&mut writer).is_err() {
return;
}
let mut owned_consumer_tags = Vec::new();
loop {
let Some(frame) = amqp_read_frame(&mut reader) else {
break;
};
if frame.frame_type == AMQP_FRAME_HEARTBEAT {
let _ = amqp_write_frame(&mut writer, AMQP_FRAME_HEARTBEAT, 0, &[]);
continue;
}
if frame.frame_type != AMQP_FRAME_METHOD {
continue;
}
let Some((class_id, method_id)) = amqp_method_id(&frame.payload) else {
break;
};
match (class_id, method_id) {
// connection.start-ok
(10, 11) => {
if amqp_write_connection_tune(&mut writer).is_err() {
break;
}
}
// connection.tune-ok
(10, 31) => {}
// connection.open
(10, 40) => {
if amqp_write_connection_open_ok(&mut writer).is_err() {
break;
}
}
// connection.close
(10, 50) => {
let _ = amqp_write_method(&mut writer, frame.channel, 10, 51, &[]);
break;
}
// channel.open
(20, 10) => {
let mut args = Vec::new();
amqp_push_longstr(&mut args, "");
if amqp_write_method(&mut writer, frame.channel, 20, 11, &args).is_err() {
break;
}
}
// channel.close
(20, 40) => {
if amqp_write_method(&mut writer, frame.channel, 20, 41, &[]).is_err() {
break;
}
}
// basic.qos
(60, 10) => {
if amqp_write_method(&mut writer, frame.channel, 60, 11, &[]).is_err() {
break;
}
}
// basic.consume
(60, 20) => {
let Some((queue, requested_tag, no_ack)) = amqp_basic_consume_args(&frame.payload)
else {
continue;
};
let queue = if queue.is_empty() {
"default".to_owned()
} else {
queue
};
let consumer_tag = if let Ok(mut guard) = state.lock() {
let tag = if requested_tag.is_empty() {
guard.next_consumer_tag += 1;
format!("nyx-consumer-{}", guard.next_consumer_tag)
} else {
requested_tag
};
guard
.consumers
.entry(queue)
.or_default()
.push(RabbitAmqpConsumer {
consumer_tag: tag.clone(),
channel: frame.channel,
no_ack,
writer: Arc::clone(&consumer_writer),
});
tag
} else {
requested_tag
};
owned_consumer_tags.push(consumer_tag.clone());
if amqp_write_basic_consume_ok(&mut writer, frame.channel, &consumer_tag).is_err() {
break;
}
}
// basic.cancel
(60, 30) => {
if let Some(consumer_tag) = amqp_basic_cancel_tag(&frame.payload) {
rabbit_amqp_remove_consumers(&state, &[consumer_tag.clone()]);
if amqp_write_basic_cancel_ok(&mut writer, frame.channel, &consumer_tag)
.is_err()
{
break;
}
}
}
// queue.declare
(50, 10) => {
let queue = amqp_queue_declare_name(&frame.payload)
.filter(|q| !q.is_empty())
.unwrap_or_else(|| "nyx-auto".to_owned());
let message_count = if let Ok(mut guard) = state.lock() {
guard.queues.entry(queue.clone()).or_default().len() as u32
} else {
0
};
if amqp_write_queue_declare_ok(&mut writer, frame.channel, &queue, message_count)
.is_err()
{
break;
}
}
// basic.publish
(60, 40) => {
let routing_key = amqp_basic_publish_routing_key(&frame.payload)
.filter(|q| !q.is_empty())
.unwrap_or_else(|| "default".to_owned());
let Some(body) = amqp_read_content_body(&mut reader, frame.channel) else {
break;
};
let payload = String::from_utf8_lossy(&body).into_owned();
if !rabbit_amqp_deliver_to_consumer(
&state,
log_path,
&routing_key,
payload.as_bytes(),
) && let Ok(mut guard) = state.lock()
{
guard
.queues
.entry(routing_key.clone())
.or_default()
.push_back(payload.clone());
}
let _ = append_broker_event(log_path, "publish", &routing_key, &payload);
}
// basic.get
(60, 70) => {
let queue = amqp_basic_get_queue(&frame.payload)
.filter(|q| !q.is_empty())
.unwrap_or_else(|| "default".to_owned());
let (delivery_tag, payload, remaining) = if let Ok(mut guard) = state.lock() {
let body = guard.queues.entry(queue.clone()).or_default().pop_front();
if let Some(body) = body {
guard.next_delivery_tag += 1;
let tag = guard.next_delivery_tag;
let remaining = guard.queues.get(&queue).map(VecDeque::len).unwrap_or(0);
guard.inflight.insert(tag, (queue.clone(), body.clone()));
(Some(tag), Some(body), remaining as u32)
} else {
(None, None, 0)
}
} else {
(None, None, 0)
};
if let (Some(tag), Some(payload)) = (delivery_tag, payload) {
let _ = append_broker_event(log_path, "deliver", &queue, &payload);
if amqp_write_basic_get_ok(
&mut writer,
frame.channel,
tag,
&queue,
remaining,
payload.as_bytes(),
)
.is_err()
{
break;
}
} else if amqp_write_basic_get_empty(&mut writer, frame.channel).is_err() {
break;
}
}
// basic.ack
(60, 80) => {
let Some((delivery_tag, multiple)) = amqp_basic_ack_tag(&frame.payload) else {
continue;
};
let mut acked = Vec::new();
if let Ok(mut guard) = state.lock() {
if multiple {
let tags: Vec<u64> = guard
.inflight
.keys()
.copied()
.filter(|tag| *tag <= delivery_tag)
.collect();
for tag in tags {
if let Some((queue, _payload)) = guard.inflight.remove(&tag) {
acked.push((queue, tag));
}
}
} else if let Some((queue, _payload)) = guard.inflight.remove(&delivery_tag) {
acked.push((queue, delivery_tag));
}
}
for (queue, tag) in acked {
let _ = append_broker_event(log_path, "ack", &queue, &tag.to_string());
}
}
_ => {}
}
}
rabbit_amqp_remove_consumers(&state, &owned_consumer_tags);
}
fn amqp_read_frame(reader: &mut BufReader<TcpStream>) -> Option<AmqpFrame> {
let mut header = [0_u8; 7];
reader.read_exact(&mut header).ok()?;
let frame_type = header[0];
let channel = u16::from_be_bytes([header[1], header[2]]);
let size = u32::from_be_bytes([header[3], header[4], header[5], header[6]]) as usize;
if size > 1024 * 1024 {
return None;
}
let mut payload = vec![0_u8; size];
if size > 0 {
reader.read_exact(&mut payload).ok()?;
}
let mut end = [0_u8; 1];
reader.read_exact(&mut end).ok()?;
if end[0] != AMQP_FRAME_END {
return None;
}
Some(AmqpFrame {
frame_type,
channel,
payload,
})
}
fn amqp_write_connection_start(writer: &mut TcpStream) -> std::io::Result<()> {
let mut args = vec![0, 9];
amqp_push_table_empty(&mut args);
amqp_push_longstr(&mut args, "PLAIN AMQPLAIN");
amqp_push_longstr(&mut args, "en_US");
amqp_write_method(writer, 0, 10, 10, &args)
}
fn amqp_write_connection_tune(writer: &mut TcpStream) -> std::io::Result<()> {
let mut args = Vec::new();
amqp_push_u16(&mut args, 2047);
amqp_push_u32(&mut args, 131_072);
amqp_push_u16(&mut args, 0);
amqp_write_method(writer, 0, 10, 30, &args)
}
fn amqp_write_connection_open_ok(writer: &mut TcpStream) -> std::io::Result<()> {
let mut args = Vec::new();
amqp_push_shortstr(&mut args, "");
amqp_write_method(writer, 0, 10, 41, &args)
}
fn amqp_write_queue_declare_ok(
writer: &mut TcpStream,
channel: u16,
queue: &str,
message_count: u32,
) -> std::io::Result<()> {
let mut args = Vec::new();
amqp_push_shortstr(&mut args, queue);
amqp_push_u32(&mut args, message_count);
amqp_push_u32(&mut args, 0);
amqp_write_method(writer, channel, 50, 11, &args)
}
fn amqp_write_basic_get_ok(
writer: &mut TcpStream,
channel: u16,
delivery_tag: u64,
routing_key: &str,
message_count: u32,
body: &[u8],
) -> std::io::Result<()> {
let mut args = Vec::new();
amqp_push_u64(&mut args, delivery_tag);
args.push(0);
amqp_push_shortstr(&mut args, "");
amqp_push_shortstr(&mut args, routing_key);
amqp_push_u32(&mut args, message_count);
amqp_write_method(writer, channel, 60, 71, &args)?;
amqp_write_content(writer, channel, body)
}
fn amqp_write_basic_get_empty(writer: &mut TcpStream, channel: u16) -> std::io::Result<()> {
let mut args = Vec::new();
amqp_push_shortstr(&mut args, "");
amqp_write_method(writer, channel, 60, 72, &args)
}
fn amqp_write_basic_consume_ok(
writer: &mut TcpStream,
channel: u16,
consumer_tag: &str,
) -> std::io::Result<()> {
let mut args = Vec::new();
amqp_push_shortstr(&mut args, consumer_tag);
amqp_write_method(writer, channel, 60, 21, &args)
}
fn amqp_write_basic_cancel_ok(
writer: &mut TcpStream,
channel: u16,
consumer_tag: &str,
) -> std::io::Result<()> {
let mut args = Vec::new();
amqp_push_shortstr(&mut args, consumer_tag);
amqp_write_method(writer, channel, 60, 31, &args)
}
fn amqp_write_basic_deliver(
writer: &mut TcpStream,
channel: u16,
consumer_tag: &str,
delivery_tag: u64,
routing_key: &str,
body: &[u8],
) -> std::io::Result<()> {
let mut args = Vec::new();
amqp_push_shortstr(&mut args, consumer_tag);
amqp_push_u64(&mut args, delivery_tag);
args.push(0);
amqp_push_shortstr(&mut args, "");
amqp_push_shortstr(&mut args, routing_key);
amqp_write_method(writer, channel, 60, 60, &args)?;
amqp_write_content(writer, channel, body)
}
fn amqp_write_content(writer: &mut TcpStream, channel: u16, body: &[u8]) -> std::io::Result<()> {
let mut header = Vec::new();
amqp_push_u16(&mut header, 60);
amqp_push_u16(&mut header, 0);
amqp_push_u64(&mut header, body.len() as u64);
amqp_push_u16(&mut header, 0);
amqp_write_frame(writer, AMQP_FRAME_HEADER, channel, &header)?;
amqp_write_frame(writer, AMQP_FRAME_BODY, channel, body)
}
fn amqp_write_method(
writer: &mut TcpStream,
channel: u16,
class_id: u16,
method_id: u16,
args: &[u8],
) -> std::io::Result<()> {
let mut payload = Vec::with_capacity(4 + args.len());
amqp_push_u16(&mut payload, class_id);
amqp_push_u16(&mut payload, method_id);
payload.extend_from_slice(args);
amqp_write_frame(writer, AMQP_FRAME_METHOD, channel, &payload)
}
fn amqp_write_frame(
writer: &mut TcpStream,
frame_type: u8,
channel: u16,
payload: &[u8],
) -> std::io::Result<()> {
writer.write_all(&[frame_type])?;
writer.write_all(&channel.to_be_bytes())?;
writer.write_all(&(payload.len() as u32).to_be_bytes())?;
writer.write_all(payload)?;
writer.write_all(&[AMQP_FRAME_END])
}
fn amqp_read_content_body(reader: &mut BufReader<TcpStream>, channel: u16) -> Option<Vec<u8>> {
let header = loop {
let frame = amqp_read_frame(reader)?;
if frame.frame_type == AMQP_FRAME_HEARTBEAT {
continue;
}
if frame.frame_type == AMQP_FRAME_HEADER && frame.channel == channel {
break frame;
}
return None;
};
if header.payload.len() < 12 {
return None;
}
let size = u64::from_be_bytes(header.payload[4..12].try_into().ok()?) as usize;
if size > 1024 * 1024 {
return None;
}
let mut body = Vec::with_capacity(size);
while body.len() < size {
let frame = amqp_read_frame(reader)?;
if frame.frame_type == AMQP_FRAME_HEARTBEAT {
continue;
}
if frame.frame_type != AMQP_FRAME_BODY || frame.channel != channel {
return None;
}
body.extend_from_slice(&frame.payload);
}
body.truncate(size);
Some(body)
}
fn amqp_method_id(payload: &[u8]) -> Option<(u16, u16)> {
if payload.len() < 4 {
return None;
}
Some((
u16::from_be_bytes([payload[0], payload[1]]),
u16::from_be_bytes([payload[2], payload[3]]),
))
}
fn amqp_queue_declare_name(payload: &[u8]) -> Option<String> {
let mut idx = 4;
amqp_take_u16(payload, &mut idx)?;
amqp_take_shortstr(payload, &mut idx)
}
fn amqp_basic_publish_routing_key(payload: &[u8]) -> Option<String> {
let mut idx = 4;
amqp_take_u16(payload, &mut idx)?;
let _exchange = amqp_take_shortstr(payload, &mut idx)?;
amqp_take_shortstr(payload, &mut idx)
}
fn amqp_basic_get_queue(payload: &[u8]) -> Option<String> {
let mut idx = 4;
amqp_take_u16(payload, &mut idx)?;
amqp_take_shortstr(payload, &mut idx)
}
fn amqp_basic_consume_args(payload: &[u8]) -> Option<(String, String, bool)> {
let mut idx = 4;
amqp_take_u16(payload, &mut idx)?;
let queue = amqp_take_shortstr(payload, &mut idx)?;
let consumer_tag = amqp_take_shortstr(payload, &mut idx)?;
let bits = payload.get(idx).copied().unwrap_or(0);
Some((queue, consumer_tag, bits & 0b0000_0010 != 0))
}
fn amqp_basic_cancel_tag(payload: &[u8]) -> Option<String> {
let mut idx = 4;
amqp_take_shortstr(payload, &mut idx)
}
fn amqp_basic_ack_tag(payload: &[u8]) -> Option<(u64, bool)> {
let mut idx = 4;
let tag = amqp_take_u64(payload, &mut idx)?;
let bits = payload.get(idx).copied().unwrap_or(0);
Some((tag, bits & 1 != 0))
}
fn amqp_take_u16(payload: &[u8], idx: &mut usize) -> Option<u16> {
let end = *idx + 2;
let bytes: [u8; 2] = payload.get(*idx..end)?.try_into().ok()?;
*idx = end;
Some(u16::from_be_bytes(bytes))
}
fn amqp_take_u64(payload: &[u8], idx: &mut usize) -> Option<u64> {
let end = *idx + 8;
let bytes: [u8; 8] = payload.get(*idx..end)?.try_into().ok()?;
*idx = end;
Some(u64::from_be_bytes(bytes))
}
fn amqp_take_shortstr(payload: &[u8], idx: &mut usize) -> Option<String> {
let len = *payload.get(*idx)? as usize;
*idx += 1;
let end = *idx + len;
let s = String::from_utf8_lossy(payload.get(*idx..end)?).into_owned();
*idx = end;
Some(s)
}
fn amqp_push_u16(out: &mut Vec<u8>, value: u16) {
out.extend_from_slice(&value.to_be_bytes());
}
fn amqp_push_u32(out: &mut Vec<u8>, value: u32) {
out.extend_from_slice(&value.to_be_bytes());
}
fn amqp_push_u64(out: &mut Vec<u8>, value: u64) {
out.extend_from_slice(&value.to_be_bytes());
}
fn amqp_push_shortstr(out: &mut Vec<u8>, value: &str) {
let bytes = value.as_bytes();
let len = bytes.len().min(u8::MAX as usize);
out.push(len as u8);
out.extend_from_slice(&bytes[..len]);
}
fn amqp_push_longstr(out: &mut Vec<u8>, value: &str) {
let bytes = value.as_bytes();
amqp_push_u32(out, bytes.len() as u32);
out.extend_from_slice(bytes);
}
fn amqp_push_table_empty(out: &mut Vec<u8>) {
amqp_push_u32(out, 0);
}
fn rabbit_amqp_deliver_to_consumer(
state: &Arc<Mutex<RabbitAmqpState>>,
log_path: &Path,
queue: &str,
body: &[u8],
) -> bool {
let Some((consumer, delivery_tag)) = ({
let mut guard = match state.lock() {
Ok(guard) => guard,
Err(_) => return false,
};
let consumer = guard
.consumers
.get(queue)
.and_then(|consumers| consumers.first())
.cloned();
consumer.map(|consumer| {
guard.next_delivery_tag += 1;
let tag = guard.next_delivery_tag;
if !consumer.no_ack {
guard.inflight.insert(
tag,
(queue.to_owned(), String::from_utf8_lossy(body).into_owned()),
);
}
(consumer, tag)
})
}) else {
return false;
};
let Ok(mut writer) = consumer.writer.lock() else {
return false;
};
if amqp_write_basic_deliver(
&mut writer,
consumer.channel,
&consumer.consumer_tag,
delivery_tag,
queue,
body,
)
.is_ok()
{
let payload = String::from_utf8_lossy(body).into_owned();
let _ = append_broker_event(log_path, "deliver", queue, &payload);
true
} else {
false
}
}
fn rabbit_amqp_remove_consumers(state: &Arc<Mutex<RabbitAmqpState>>, consumer_tags: &[String]) {
if consumer_tags.is_empty() {
return;
}
if let Ok(mut guard) = state.lock() {
for consumers in guard.consumers.values_mut() {
consumers.retain(|consumer| !consumer_tags.contains(&consumer.consumer_tag));
}
}
}
#[derive(Debug)]
struct NatsListener {
port: u16,
@ -1202,8 +1878,8 @@ mod tests {
}
#[test]
fn remaining_brokers_expose_http_emulators() {
for kind in [StubKind::Pubsub, StubKind::Rabbit] {
fn pubsub_broker_exposes_http_emulator() {
for kind in [StubKind::Pubsub] {
let dir = TempDir::new().unwrap();
let stub = BrokerStub::start(kind, dir.path()).unwrap();
let endpoint = stub.endpoint();
@ -1217,6 +1893,20 @@ mod tests {
}
}
#[test]
fn rabbit_broker_exposes_amqp_endpoint() {
let dir = TempDir::new().unwrap();
let stub = BrokerStub::start(StubKind::Rabbit, dir.path()).unwrap();
let endpoint = stub.endpoint();
if endpoint == "loopback://rabbit" {
return;
}
assert!(
endpoint.starts_with("amqp://127.0.0.1:"),
"Rabbit endpoint should be a protocol-compatible AMQP endpoint, got {endpoint}"
);
}
#[test]
fn nats_broker_exposes_protocol_endpoint() {
let dir = TempDir::new().unwrap();
@ -1316,11 +2006,8 @@ mod tests {
}
#[test]
fn remaining_http_broker_emulators_record_publish_deliver_ack() {
let cases = [
(StubKind::Pubsub, "topics", "projects/p/topics/orders"),
(StubKind::Rabbit, "queues", "work"),
];
fn pubsub_http_broker_emulator_records_publish_deliver_ack() {
let cases = [(StubKind::Pubsub, "topics", "projects/p/topics/orders")];
for (kind, root, destination) in cases {
let dir = TempDir::new().unwrap();
let stub = BrokerStub::start(kind, dir.path()).unwrap();
@ -1377,6 +2064,176 @@ mod tests {
}
}
#[test]
fn rabbit_amqp_protocol_server_records_publish_deliver_ack() {
let dir = TempDir::new().unwrap();
let stub = BrokerStub::start(StubKind::Rabbit, dir.path()).unwrap();
let endpoint = stub.endpoint();
if endpoint == "loopback://rabbit" {
return;
}
let port: u16 = endpoint
.trim_start_matches("amqp://127.0.0.1:")
.split('/')
.next()
.unwrap()
.parse()
.unwrap();
let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
let mut reader = BufReader::new(s.try_clone().unwrap());
s.write_all(b"AMQP\0\0\x09\x01").unwrap();
assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 0, 10, 10);
let mut start_ok = Vec::new();
amqp_push_table_empty(&mut start_ok);
amqp_push_shortstr(&mut start_ok, "PLAIN");
amqp_push_longstr(&mut start_ok, "\0guest\0guest");
amqp_push_shortstr(&mut start_ok, "en_US");
amqp_write_method(&mut s, 0, 10, 11, &start_ok).unwrap();
assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 0, 10, 30);
let mut tune_ok = Vec::new();
amqp_push_u16(&mut tune_ok, 2047);
amqp_push_u32(&mut tune_ok, 131_072);
amqp_push_u16(&mut tune_ok, 0);
amqp_write_method(&mut s, 0, 10, 31, &tune_ok).unwrap();
let mut open = Vec::new();
amqp_push_shortstr(&mut open, "/");
amqp_push_shortstr(&mut open, "");
open.push(0);
amqp_write_method(&mut s, 0, 10, 40, &open).unwrap();
assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 0, 10, 41);
let mut channel_open = Vec::new();
amqp_push_longstr(&mut channel_open, "");
amqp_write_method(&mut s, 1, 20, 10, &channel_open).unwrap();
assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 20, 11);
let mut declare = Vec::new();
amqp_push_u16(&mut declare, 0);
amqp_push_shortstr(&mut declare, "work");
declare.push(0);
amqp_push_table_empty(&mut declare);
amqp_write_method(&mut s, 1, 50, 10, &declare).unwrap();
assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 50, 11);
let mut publish = Vec::new();
amqp_push_u16(&mut publish, 0);
amqp_push_shortstr(&mut publish, "");
amqp_push_shortstr(&mut publish, "work");
publish.push(0);
amqp_write_method(&mut s, 1, 60, 40, &publish).unwrap();
amqp_write_content(&mut s, 1, b"NYX\tPAYLOAD").unwrap();
let mut get = Vec::new();
amqp_push_u16(&mut get, 0);
amqp_push_shortstr(&mut get, "work");
get.push(0);
amqp_write_method(&mut s, 1, 60, 70, &get).unwrap();
let get_ok = amqp_read_frame(&mut reader).unwrap();
assert_amqp_method_ref(&get_ok, 1, 60, 71);
let mut idx = 4;
let delivery_tag = amqp_take_u64(&get_ok.payload, &mut idx).unwrap();
let header = amqp_read_frame(&mut reader).unwrap();
assert_eq!(header.frame_type, AMQP_FRAME_HEADER);
let body = amqp_read_frame(&mut reader).unwrap();
assert_eq!(body.frame_type, AMQP_FRAME_BODY);
assert_eq!(body.payload, b"NYX\tPAYLOAD");
let mut ack = Vec::new();
amqp_push_u64(&mut ack, delivery_tag);
ack.push(0);
amqp_write_method(&mut s, 1, 60, 80, &ack).unwrap();
std::thread::sleep(Duration::from_millis(25));
let events = stub.drain_events();
let actions: Vec<&str> = events
.iter()
.map(|ev| ev.detail.get("action").unwrap().as_str())
.collect();
assert_eq!(actions, vec!["publish", "deliver", "ack"]);
assert_eq!(events[0].detail.get("destination").unwrap(), "work");
assert_eq!(events[1].detail.get("payload").unwrap(), "NYX\tPAYLOAD");
assert_eq!(
events[2].detail.get("payload").unwrap(),
&delivery_tag.to_string()
);
}
#[test]
fn rabbit_amqp_basic_consume_receives_published_messages() {
let dir = TempDir::new().unwrap();
let stub = BrokerStub::start(StubKind::Rabbit, dir.path()).unwrap();
let endpoint = stub.endpoint();
if endpoint == "loopback://rabbit" {
return;
}
let port: u16 = endpoint
.trim_start_matches("amqp://127.0.0.1:")
.split('/')
.next()
.unwrap()
.parse()
.unwrap();
let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
let mut reader = BufReader::new(s.try_clone().unwrap());
amqp_test_open_channel(&mut s, &mut reader);
let mut declare = Vec::new();
amqp_push_u16(&mut declare, 0);
amqp_push_shortstr(&mut declare, "work");
declare.push(0);
amqp_push_table_empty(&mut declare);
amqp_write_method(&mut s, 1, 50, 10, &declare).unwrap();
assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 50, 11);
let mut consume = Vec::new();
amqp_push_u16(&mut consume, 0);
amqp_push_shortstr(&mut consume, "work");
amqp_push_shortstr(&mut consume, "ctag");
consume.push(0);
amqp_push_table_empty(&mut consume);
amqp_write_method(&mut s, 1, 60, 20, &consume).unwrap();
assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 60, 21);
let mut publish = Vec::new();
amqp_push_u16(&mut publish, 0);
amqp_push_shortstr(&mut publish, "");
amqp_push_shortstr(&mut publish, "work");
publish.push(0);
amqp_write_method(&mut s, 1, 60, 40, &publish).unwrap();
amqp_write_content(&mut s, 1, b"async payload").unwrap();
let deliver = amqp_read_frame(&mut reader).unwrap();
assert_amqp_method_ref(&deliver, 1, 60, 60);
let mut idx = 4;
assert_eq!(
amqp_take_shortstr(&deliver.payload, &mut idx).unwrap(),
"ctag"
);
let delivery_tag = amqp_take_u64(&deliver.payload, &mut idx).unwrap();
let header = amqp_read_frame(&mut reader).unwrap();
assert_eq!(header.frame_type, AMQP_FRAME_HEADER);
let body = amqp_read_frame(&mut reader).unwrap();
assert_eq!(body.frame_type, AMQP_FRAME_BODY);
assert_eq!(body.payload, b"async payload");
let mut ack = Vec::new();
amqp_push_u64(&mut ack, delivery_tag);
ack.push(0);
amqp_write_method(&mut s, 1, 60, 80, &ack).unwrap();
std::thread::sleep(Duration::from_millis(25));
let events = stub.drain_events();
let actions: Vec<&str> = events
.iter()
.map(|ev| ev.detail.get("action").unwrap().as_str())
.collect();
assert_eq!(actions, vec!["publish", "deliver", "ack"]);
assert_eq!(events[1].detail.get("payload").unwrap(), "async payload");
}
#[test]
fn nats_protocol_server_records_publish_deliver() {
let dir = TempDir::new().unwrap();
@ -1495,6 +2352,47 @@ mod tests {
out
}
fn assert_amqp_method(frame: AmqpFrame, channel: u16, class_id: u16, method_id: u16) {
assert_amqp_method_ref(&frame, channel, class_id, method_id);
}
fn assert_amqp_method_ref(frame: &AmqpFrame, channel: u16, class_id: u16, method_id: u16) {
assert_eq!(frame.frame_type, AMQP_FRAME_METHOD);
assert_eq!(frame.channel, channel);
assert_eq!(amqp_method_id(&frame.payload), Some((class_id, method_id)));
}
fn amqp_test_open_channel(s: &mut TcpStream, reader: &mut BufReader<TcpStream>) {
s.write_all(b"AMQP\0\0\x09\x01").unwrap();
assert_amqp_method(amqp_read_frame(reader).unwrap(), 0, 10, 10);
let mut start_ok = Vec::new();
amqp_push_table_empty(&mut start_ok);
amqp_push_shortstr(&mut start_ok, "PLAIN");
amqp_push_longstr(&mut start_ok, "\0guest\0guest");
amqp_push_shortstr(&mut start_ok, "en_US");
amqp_write_method(s, 0, 10, 11, &start_ok).unwrap();
assert_amqp_method(amqp_read_frame(reader).unwrap(), 0, 10, 30);
let mut tune_ok = Vec::new();
amqp_push_u16(&mut tune_ok, 2047);
amqp_push_u32(&mut tune_ok, 131_072);
amqp_push_u16(&mut tune_ok, 0);
amqp_write_method(s, 0, 10, 31, &tune_ok).unwrap();
let mut open = Vec::new();
amqp_push_shortstr(&mut open, "/");
amqp_push_shortstr(&mut open, "");
open.push(0);
amqp_write_method(s, 0, 10, 40, &open).unwrap();
assert_amqp_method(amqp_read_frame(reader).unwrap(), 0, 10, 41);
let mut channel_open = Vec::new();
amqp_push_longstr(&mut channel_open, "");
amqp_write_method(s, 1, 20, 10, &channel_open).unwrap();
assert_amqp_method(amqp_read_frame(reader).unwrap(), 1, 20, 11);
}
fn form_escape(input: &str) -> String {
let mut out = String::new();
for b in input.bytes() {