diff --git a/src/dynamic/sandbox/mod.rs b/src/dynamic/sandbox/mod.rs index 00d39555..800c5e95 100644 --- a/src/dynamic/sandbox/mod.rs +++ b/src/dynamic/sandbox/mod.rs @@ -915,6 +915,11 @@ fn rewrite_extra_env_for_container( { return (k.clone(), format!("nats://host-gateway:{rest}")); } + if k == "NYX_RABBIT_ENDPOINT" + && let Some(rest) = v.strip_prefix("amqp://127.0.0.1:") + { + return (k.clone(), format!("amqp://host-gateway:{rest}")); + } (k.clone(), v.clone()) }) .collect() @@ -2302,7 +2307,7 @@ mod tests { ), ( "NYX_RABBIT_ENDPOINT".to_owned(), - "http://127.0.0.1:45678/queues".to_owned(), + "amqp://127.0.0.1:45678/%2f".to_owned(), ), ( "NYX_NATS_ENDPOINT".to_owned(), @@ -2331,7 +2336,7 @@ mod tests { ), ( "NYX_RABBIT_ENDPOINT".to_owned(), - "http://host-gateway:45678/queues".to_owned(), + "amqp://host-gateway:45678/%2f".to_owned(), ), ( "NYX_NATS_ENDPOINT".to_owned(), diff --git a/src/dynamic/stubs/broker.rs b/src/dynamic/stubs/broker.rs index b4cd4768..a11080b9 100644 --- a/src/dynamic/stubs/broker.rs +++ b/src/dynamic/stubs/broker.rs @@ -33,6 +33,7 @@ pub struct BrokerStub { kafka_listener: Option, sqs_listener: Option, http_listener: Option, + rabbit_amqp_listener: Option, nats_listener: Option, } @@ -60,6 +61,11 @@ impl BrokerStub { } else { None }; + let rabbit_amqp_listener = if kind == StubKind::Rabbit { + start_rabbit_amqp_listener(log_path.clone())? + } else { + None + }; let nats_listener = if kind == StubKind::Nats { start_nats_listener(log_path.clone())? } else { @@ -73,6 +79,7 @@ impl BrokerStub { kafka_listener, sqs_listener, http_listener, + rabbit_amqp_listener, nats_listener, }) } @@ -134,6 +141,9 @@ impl StubProvider for BrokerStub { if let Some(listener) = &self.sqs_listener { return format!("http://127.0.0.1:{}", listener.port); } + if let Some(listener) = &self.rabbit_amqp_listener { + return format!("amqp://127.0.0.1:{}/%2f", listener.port); + } if let Some(listener) = &self.http_listener { return format!("http://127.0.0.1:{}", listener.port); } @@ -224,6 +234,10 @@ impl Drop for BrokerStub { listener.shutdown.store(true, Ordering::Relaxed); let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port)); } + if let Some(listener) = &self.rabbit_amqp_listener { + listener.shutdown.store(true, Ordering::Relaxed); + let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port)); + } if let Some(listener) = &self.nats_listener { listener.shutdown.store(true, Ordering::Relaxed); let _ = TcpStream::connect(format!("127.0.0.1:{}", listener.port)); @@ -688,6 +702,668 @@ fn http_broker_message_json( } } +#[derive(Debug)] +struct RabbitAmqpListener { + port: u16, + shutdown: Arc, +} + +#[derive(Debug, Default)] +struct RabbitAmqpState { + next_delivery_tag: u64, + next_consumer_tag: u64, + queues: BTreeMap>, + inflight: BTreeMap, + consumers: BTreeMap>, +} + +#[derive(Debug, Clone)] +struct RabbitAmqpConsumer { + consumer_tag: String, + channel: u16, + no_ack: bool, + writer: Arc>, +} + +#[derive(Debug)] +struct AmqpFrame { + frame_type: u8, + channel: u16, + payload: Vec, +} + +const AMQP_FRAME_METHOD: u8 = 1; +const AMQP_FRAME_HEADER: u8 = 2; +const AMQP_FRAME_BODY: u8 = 3; +const AMQP_FRAME_HEARTBEAT: u8 = 8; +const AMQP_FRAME_END: u8 = 0xce; + +fn start_rabbit_amqp_listener(log_path: PathBuf) -> std::io::Result> { + let listener = match TcpListener::bind("127.0.0.1:0") { + Ok(listener) => listener, + Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => return Ok(None), + Err(e) => return Err(e), + }; + let port = listener.local_addr()?.port(); + let shutdown = Arc::new(AtomicBool::new(false)); + let state = Arc::new(Mutex::new(RabbitAmqpState::default())); + let shutdown_clone = Arc::clone(&shutdown); + let state_clone = Arc::clone(&state); + std::thread::spawn(move || { + rabbit_amqp_accept_loop(listener, shutdown_clone, state_clone, log_path) + }); + Ok(Some(RabbitAmqpListener { port, shutdown })) +} + +fn rabbit_amqp_accept_loop( + listener: TcpListener, + shutdown: Arc, + state: Arc>, + log_path: PathBuf, +) { + for stream in listener.incoming() { + if shutdown.load(Ordering::Relaxed) { + break; + } + let Ok(stream) = stream else { continue }; + let _ = stream.set_read_timeout(Some(Duration::from_secs(5))); + let _ = stream.set_write_timeout(Some(Duration::from_secs(5))); + let state = Arc::clone(&state); + let log_path = log_path.clone(); + std::thread::spawn(move || handle_rabbit_amqp_connection(stream, state, &log_path)); + } +} + +fn handle_rabbit_amqp_connection( + stream: TcpStream, + state: Arc>, + log_path: &Path, +) { + let Ok(mut writer) = stream.try_clone() else { + return; + }; + let consumer_writer = match stream.try_clone() { + Ok(stream) => Arc::new(Mutex::new(stream)), + Err(_) => return, + }; + let mut reader = BufReader::new(stream); + let mut protocol = [0_u8; 8]; + if reader.read_exact(&mut protocol).is_err() || &protocol != b"AMQP\0\0\x09\x01" { + return; + } + if amqp_write_connection_start(&mut writer).is_err() { + return; + } + + let mut owned_consumer_tags = Vec::new(); + loop { + let Some(frame) = amqp_read_frame(&mut reader) else { + break; + }; + if frame.frame_type == AMQP_FRAME_HEARTBEAT { + let _ = amqp_write_frame(&mut writer, AMQP_FRAME_HEARTBEAT, 0, &[]); + continue; + } + if frame.frame_type != AMQP_FRAME_METHOD { + continue; + } + let Some((class_id, method_id)) = amqp_method_id(&frame.payload) else { + break; + }; + match (class_id, method_id) { + // connection.start-ok + (10, 11) => { + if amqp_write_connection_tune(&mut writer).is_err() { + break; + } + } + // connection.tune-ok + (10, 31) => {} + // connection.open + (10, 40) => { + if amqp_write_connection_open_ok(&mut writer).is_err() { + break; + } + } + // connection.close + (10, 50) => { + let _ = amqp_write_method(&mut writer, frame.channel, 10, 51, &[]); + break; + } + // channel.open + (20, 10) => { + let mut args = Vec::new(); + amqp_push_longstr(&mut args, ""); + if amqp_write_method(&mut writer, frame.channel, 20, 11, &args).is_err() { + break; + } + } + // channel.close + (20, 40) => { + if amqp_write_method(&mut writer, frame.channel, 20, 41, &[]).is_err() { + break; + } + } + // basic.qos + (60, 10) => { + if amqp_write_method(&mut writer, frame.channel, 60, 11, &[]).is_err() { + break; + } + } + // basic.consume + (60, 20) => { + let Some((queue, requested_tag, no_ack)) = amqp_basic_consume_args(&frame.payload) + else { + continue; + }; + let queue = if queue.is_empty() { + "default".to_owned() + } else { + queue + }; + let consumer_tag = if let Ok(mut guard) = state.lock() { + let tag = if requested_tag.is_empty() { + guard.next_consumer_tag += 1; + format!("nyx-consumer-{}", guard.next_consumer_tag) + } else { + requested_tag + }; + guard + .consumers + .entry(queue) + .or_default() + .push(RabbitAmqpConsumer { + consumer_tag: tag.clone(), + channel: frame.channel, + no_ack, + writer: Arc::clone(&consumer_writer), + }); + tag + } else { + requested_tag + }; + owned_consumer_tags.push(consumer_tag.clone()); + if amqp_write_basic_consume_ok(&mut writer, frame.channel, &consumer_tag).is_err() { + break; + } + } + // basic.cancel + (60, 30) => { + if let Some(consumer_tag) = amqp_basic_cancel_tag(&frame.payload) { + rabbit_amqp_remove_consumers(&state, &[consumer_tag.clone()]); + if amqp_write_basic_cancel_ok(&mut writer, frame.channel, &consumer_tag) + .is_err() + { + break; + } + } + } + // queue.declare + (50, 10) => { + let queue = amqp_queue_declare_name(&frame.payload) + .filter(|q| !q.is_empty()) + .unwrap_or_else(|| "nyx-auto".to_owned()); + let message_count = if let Ok(mut guard) = state.lock() { + guard.queues.entry(queue.clone()).or_default().len() as u32 + } else { + 0 + }; + if amqp_write_queue_declare_ok(&mut writer, frame.channel, &queue, message_count) + .is_err() + { + break; + } + } + // basic.publish + (60, 40) => { + let routing_key = amqp_basic_publish_routing_key(&frame.payload) + .filter(|q| !q.is_empty()) + .unwrap_or_else(|| "default".to_owned()); + let Some(body) = amqp_read_content_body(&mut reader, frame.channel) else { + break; + }; + let payload = String::from_utf8_lossy(&body).into_owned(); + if !rabbit_amqp_deliver_to_consumer( + &state, + log_path, + &routing_key, + payload.as_bytes(), + ) && let Ok(mut guard) = state.lock() + { + guard + .queues + .entry(routing_key.clone()) + .or_default() + .push_back(payload.clone()); + } + let _ = append_broker_event(log_path, "publish", &routing_key, &payload); + } + // basic.get + (60, 70) => { + let queue = amqp_basic_get_queue(&frame.payload) + .filter(|q| !q.is_empty()) + .unwrap_or_else(|| "default".to_owned()); + let (delivery_tag, payload, remaining) = if let Ok(mut guard) = state.lock() { + let body = guard.queues.entry(queue.clone()).or_default().pop_front(); + if let Some(body) = body { + guard.next_delivery_tag += 1; + let tag = guard.next_delivery_tag; + let remaining = guard.queues.get(&queue).map(VecDeque::len).unwrap_or(0); + guard.inflight.insert(tag, (queue.clone(), body.clone())); + (Some(tag), Some(body), remaining as u32) + } else { + (None, None, 0) + } + } else { + (None, None, 0) + }; + if let (Some(tag), Some(payload)) = (delivery_tag, payload) { + let _ = append_broker_event(log_path, "deliver", &queue, &payload); + if amqp_write_basic_get_ok( + &mut writer, + frame.channel, + tag, + &queue, + remaining, + payload.as_bytes(), + ) + .is_err() + { + break; + } + } else if amqp_write_basic_get_empty(&mut writer, frame.channel).is_err() { + break; + } + } + // basic.ack + (60, 80) => { + let Some((delivery_tag, multiple)) = amqp_basic_ack_tag(&frame.payload) else { + continue; + }; + let mut acked = Vec::new(); + if let Ok(mut guard) = state.lock() { + if multiple { + let tags: Vec = guard + .inflight + .keys() + .copied() + .filter(|tag| *tag <= delivery_tag) + .collect(); + for tag in tags { + if let Some((queue, _payload)) = guard.inflight.remove(&tag) { + acked.push((queue, tag)); + } + } + } else if let Some((queue, _payload)) = guard.inflight.remove(&delivery_tag) { + acked.push((queue, delivery_tag)); + } + } + for (queue, tag) in acked { + let _ = append_broker_event(log_path, "ack", &queue, &tag.to_string()); + } + } + _ => {} + } + } + rabbit_amqp_remove_consumers(&state, &owned_consumer_tags); +} + +fn amqp_read_frame(reader: &mut BufReader) -> Option { + let mut header = [0_u8; 7]; + reader.read_exact(&mut header).ok()?; + let frame_type = header[0]; + let channel = u16::from_be_bytes([header[1], header[2]]); + let size = u32::from_be_bytes([header[3], header[4], header[5], header[6]]) as usize; + if size > 1024 * 1024 { + return None; + } + let mut payload = vec![0_u8; size]; + if size > 0 { + reader.read_exact(&mut payload).ok()?; + } + let mut end = [0_u8; 1]; + reader.read_exact(&mut end).ok()?; + if end[0] != AMQP_FRAME_END { + return None; + } + Some(AmqpFrame { + frame_type, + channel, + payload, + }) +} + +fn amqp_write_connection_start(writer: &mut TcpStream) -> std::io::Result<()> { + let mut args = vec![0, 9]; + amqp_push_table_empty(&mut args); + amqp_push_longstr(&mut args, "PLAIN AMQPLAIN"); + amqp_push_longstr(&mut args, "en_US"); + amqp_write_method(writer, 0, 10, 10, &args) +} + +fn amqp_write_connection_tune(writer: &mut TcpStream) -> std::io::Result<()> { + let mut args = Vec::new(); + amqp_push_u16(&mut args, 2047); + amqp_push_u32(&mut args, 131_072); + amqp_push_u16(&mut args, 0); + amqp_write_method(writer, 0, 10, 30, &args) +} + +fn amqp_write_connection_open_ok(writer: &mut TcpStream) -> std::io::Result<()> { + let mut args = Vec::new(); + amqp_push_shortstr(&mut args, ""); + amqp_write_method(writer, 0, 10, 41, &args) +} + +fn amqp_write_queue_declare_ok( + writer: &mut TcpStream, + channel: u16, + queue: &str, + message_count: u32, +) -> std::io::Result<()> { + let mut args = Vec::new(); + amqp_push_shortstr(&mut args, queue); + amqp_push_u32(&mut args, message_count); + amqp_push_u32(&mut args, 0); + amqp_write_method(writer, channel, 50, 11, &args) +} + +fn amqp_write_basic_get_ok( + writer: &mut TcpStream, + channel: u16, + delivery_tag: u64, + routing_key: &str, + message_count: u32, + body: &[u8], +) -> std::io::Result<()> { + let mut args = Vec::new(); + amqp_push_u64(&mut args, delivery_tag); + args.push(0); + amqp_push_shortstr(&mut args, ""); + amqp_push_shortstr(&mut args, routing_key); + amqp_push_u32(&mut args, message_count); + amqp_write_method(writer, channel, 60, 71, &args)?; + amqp_write_content(writer, channel, body) +} + +fn amqp_write_basic_get_empty(writer: &mut TcpStream, channel: u16) -> std::io::Result<()> { + let mut args = Vec::new(); + amqp_push_shortstr(&mut args, ""); + amqp_write_method(writer, channel, 60, 72, &args) +} + +fn amqp_write_basic_consume_ok( + writer: &mut TcpStream, + channel: u16, + consumer_tag: &str, +) -> std::io::Result<()> { + let mut args = Vec::new(); + amqp_push_shortstr(&mut args, consumer_tag); + amqp_write_method(writer, channel, 60, 21, &args) +} + +fn amqp_write_basic_cancel_ok( + writer: &mut TcpStream, + channel: u16, + consumer_tag: &str, +) -> std::io::Result<()> { + let mut args = Vec::new(); + amqp_push_shortstr(&mut args, consumer_tag); + amqp_write_method(writer, channel, 60, 31, &args) +} + +fn amqp_write_basic_deliver( + writer: &mut TcpStream, + channel: u16, + consumer_tag: &str, + delivery_tag: u64, + routing_key: &str, + body: &[u8], +) -> std::io::Result<()> { + let mut args = Vec::new(); + amqp_push_shortstr(&mut args, consumer_tag); + amqp_push_u64(&mut args, delivery_tag); + args.push(0); + amqp_push_shortstr(&mut args, ""); + amqp_push_shortstr(&mut args, routing_key); + amqp_write_method(writer, channel, 60, 60, &args)?; + amqp_write_content(writer, channel, body) +} + +fn amqp_write_content(writer: &mut TcpStream, channel: u16, body: &[u8]) -> std::io::Result<()> { + let mut header = Vec::new(); + amqp_push_u16(&mut header, 60); + amqp_push_u16(&mut header, 0); + amqp_push_u64(&mut header, body.len() as u64); + amqp_push_u16(&mut header, 0); + amqp_write_frame(writer, AMQP_FRAME_HEADER, channel, &header)?; + amqp_write_frame(writer, AMQP_FRAME_BODY, channel, body) +} + +fn amqp_write_method( + writer: &mut TcpStream, + channel: u16, + class_id: u16, + method_id: u16, + args: &[u8], +) -> std::io::Result<()> { + let mut payload = Vec::with_capacity(4 + args.len()); + amqp_push_u16(&mut payload, class_id); + amqp_push_u16(&mut payload, method_id); + payload.extend_from_slice(args); + amqp_write_frame(writer, AMQP_FRAME_METHOD, channel, &payload) +} + +fn amqp_write_frame( + writer: &mut TcpStream, + frame_type: u8, + channel: u16, + payload: &[u8], +) -> std::io::Result<()> { + writer.write_all(&[frame_type])?; + writer.write_all(&channel.to_be_bytes())?; + writer.write_all(&(payload.len() as u32).to_be_bytes())?; + writer.write_all(payload)?; + writer.write_all(&[AMQP_FRAME_END]) +} + +fn amqp_read_content_body(reader: &mut BufReader, channel: u16) -> Option> { + let header = loop { + let frame = amqp_read_frame(reader)?; + if frame.frame_type == AMQP_FRAME_HEARTBEAT { + continue; + } + if frame.frame_type == AMQP_FRAME_HEADER && frame.channel == channel { + break frame; + } + return None; + }; + if header.payload.len() < 12 { + return None; + } + let size = u64::from_be_bytes(header.payload[4..12].try_into().ok()?) as usize; + if size > 1024 * 1024 { + return None; + } + let mut body = Vec::with_capacity(size); + while body.len() < size { + let frame = amqp_read_frame(reader)?; + if frame.frame_type == AMQP_FRAME_HEARTBEAT { + continue; + } + if frame.frame_type != AMQP_FRAME_BODY || frame.channel != channel { + return None; + } + body.extend_from_slice(&frame.payload); + } + body.truncate(size); + Some(body) +} + +fn amqp_method_id(payload: &[u8]) -> Option<(u16, u16)> { + if payload.len() < 4 { + return None; + } + Some(( + u16::from_be_bytes([payload[0], payload[1]]), + u16::from_be_bytes([payload[2], payload[3]]), + )) +} + +fn amqp_queue_declare_name(payload: &[u8]) -> Option { + let mut idx = 4; + amqp_take_u16(payload, &mut idx)?; + amqp_take_shortstr(payload, &mut idx) +} + +fn amqp_basic_publish_routing_key(payload: &[u8]) -> Option { + let mut idx = 4; + amqp_take_u16(payload, &mut idx)?; + let _exchange = amqp_take_shortstr(payload, &mut idx)?; + amqp_take_shortstr(payload, &mut idx) +} + +fn amqp_basic_get_queue(payload: &[u8]) -> Option { + let mut idx = 4; + amqp_take_u16(payload, &mut idx)?; + amqp_take_shortstr(payload, &mut idx) +} + +fn amqp_basic_consume_args(payload: &[u8]) -> Option<(String, String, bool)> { + let mut idx = 4; + amqp_take_u16(payload, &mut idx)?; + let queue = amqp_take_shortstr(payload, &mut idx)?; + let consumer_tag = amqp_take_shortstr(payload, &mut idx)?; + let bits = payload.get(idx).copied().unwrap_or(0); + Some((queue, consumer_tag, bits & 0b0000_0010 != 0)) +} + +fn amqp_basic_cancel_tag(payload: &[u8]) -> Option { + let mut idx = 4; + amqp_take_shortstr(payload, &mut idx) +} + +fn amqp_basic_ack_tag(payload: &[u8]) -> Option<(u64, bool)> { + let mut idx = 4; + let tag = amqp_take_u64(payload, &mut idx)?; + let bits = payload.get(idx).copied().unwrap_or(0); + Some((tag, bits & 1 != 0)) +} + +fn amqp_take_u16(payload: &[u8], idx: &mut usize) -> Option { + let end = *idx + 2; + let bytes: [u8; 2] = payload.get(*idx..end)?.try_into().ok()?; + *idx = end; + Some(u16::from_be_bytes(bytes)) +} + +fn amqp_take_u64(payload: &[u8], idx: &mut usize) -> Option { + let end = *idx + 8; + let bytes: [u8; 8] = payload.get(*idx..end)?.try_into().ok()?; + *idx = end; + Some(u64::from_be_bytes(bytes)) +} + +fn amqp_take_shortstr(payload: &[u8], idx: &mut usize) -> Option { + let len = *payload.get(*idx)? as usize; + *idx += 1; + let end = *idx + len; + let s = String::from_utf8_lossy(payload.get(*idx..end)?).into_owned(); + *idx = end; + Some(s) +} + +fn amqp_push_u16(out: &mut Vec, value: u16) { + out.extend_from_slice(&value.to_be_bytes()); +} + +fn amqp_push_u32(out: &mut Vec, value: u32) { + out.extend_from_slice(&value.to_be_bytes()); +} + +fn amqp_push_u64(out: &mut Vec, value: u64) { + out.extend_from_slice(&value.to_be_bytes()); +} + +fn amqp_push_shortstr(out: &mut Vec, value: &str) { + let bytes = value.as_bytes(); + let len = bytes.len().min(u8::MAX as usize); + out.push(len as u8); + out.extend_from_slice(&bytes[..len]); +} + +fn amqp_push_longstr(out: &mut Vec, value: &str) { + let bytes = value.as_bytes(); + amqp_push_u32(out, bytes.len() as u32); + out.extend_from_slice(bytes); +} + +fn amqp_push_table_empty(out: &mut Vec) { + amqp_push_u32(out, 0); +} + +fn rabbit_amqp_deliver_to_consumer( + state: &Arc>, + log_path: &Path, + queue: &str, + body: &[u8], +) -> bool { + let Some((consumer, delivery_tag)) = ({ + let mut guard = match state.lock() { + Ok(guard) => guard, + Err(_) => return false, + }; + let consumer = guard + .consumers + .get(queue) + .and_then(|consumers| consumers.first()) + .cloned(); + consumer.map(|consumer| { + guard.next_delivery_tag += 1; + let tag = guard.next_delivery_tag; + if !consumer.no_ack { + guard.inflight.insert( + tag, + (queue.to_owned(), String::from_utf8_lossy(body).into_owned()), + ); + } + (consumer, tag) + }) + }) else { + return false; + }; + let Ok(mut writer) = consumer.writer.lock() else { + return false; + }; + if amqp_write_basic_deliver( + &mut writer, + consumer.channel, + &consumer.consumer_tag, + delivery_tag, + queue, + body, + ) + .is_ok() + { + let payload = String::from_utf8_lossy(body).into_owned(); + let _ = append_broker_event(log_path, "deliver", queue, &payload); + true + } else { + false + } +} + +fn rabbit_amqp_remove_consumers(state: &Arc>, consumer_tags: &[String]) { + if consumer_tags.is_empty() { + return; + } + if let Ok(mut guard) = state.lock() { + for consumers in guard.consumers.values_mut() { + consumers.retain(|consumer| !consumer_tags.contains(&consumer.consumer_tag)); + } + } +} + #[derive(Debug)] struct NatsListener { port: u16, @@ -1202,8 +1878,8 @@ mod tests { } #[test] - fn remaining_brokers_expose_http_emulators() { - for kind in [StubKind::Pubsub, StubKind::Rabbit] { + fn pubsub_broker_exposes_http_emulator() { + for kind in [StubKind::Pubsub] { let dir = TempDir::new().unwrap(); let stub = BrokerStub::start(kind, dir.path()).unwrap(); let endpoint = stub.endpoint(); @@ -1217,6 +1893,20 @@ mod tests { } } + #[test] + fn rabbit_broker_exposes_amqp_endpoint() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Rabbit, dir.path()).unwrap(); + let endpoint = stub.endpoint(); + if endpoint == "loopback://rabbit" { + return; + } + assert!( + endpoint.starts_with("amqp://127.0.0.1:"), + "Rabbit endpoint should be a protocol-compatible AMQP endpoint, got {endpoint}" + ); + } + #[test] fn nats_broker_exposes_protocol_endpoint() { let dir = TempDir::new().unwrap(); @@ -1316,11 +2006,8 @@ mod tests { } #[test] - fn remaining_http_broker_emulators_record_publish_deliver_ack() { - let cases = [ - (StubKind::Pubsub, "topics", "projects/p/topics/orders"), - (StubKind::Rabbit, "queues", "work"), - ]; + fn pubsub_http_broker_emulator_records_publish_deliver_ack() { + let cases = [(StubKind::Pubsub, "topics", "projects/p/topics/orders")]; for (kind, root, destination) in cases { let dir = TempDir::new().unwrap(); let stub = BrokerStub::start(kind, dir.path()).unwrap(); @@ -1377,6 +2064,176 @@ mod tests { } } + #[test] + fn rabbit_amqp_protocol_server_records_publish_deliver_ack() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Rabbit, dir.path()).unwrap(); + let endpoint = stub.endpoint(); + if endpoint == "loopback://rabbit" { + return; + } + let port: u16 = endpoint + .trim_start_matches("amqp://127.0.0.1:") + .split('/') + .next() + .unwrap() + .parse() + .unwrap(); + let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap(); + let mut reader = BufReader::new(s.try_clone().unwrap()); + s.write_all(b"AMQP\0\0\x09\x01").unwrap(); + assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 0, 10, 10); + + let mut start_ok = Vec::new(); + amqp_push_table_empty(&mut start_ok); + amqp_push_shortstr(&mut start_ok, "PLAIN"); + amqp_push_longstr(&mut start_ok, "\0guest\0guest"); + amqp_push_shortstr(&mut start_ok, "en_US"); + amqp_write_method(&mut s, 0, 10, 11, &start_ok).unwrap(); + assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 0, 10, 30); + + let mut tune_ok = Vec::new(); + amqp_push_u16(&mut tune_ok, 2047); + amqp_push_u32(&mut tune_ok, 131_072); + amqp_push_u16(&mut tune_ok, 0); + amqp_write_method(&mut s, 0, 10, 31, &tune_ok).unwrap(); + + let mut open = Vec::new(); + amqp_push_shortstr(&mut open, "/"); + amqp_push_shortstr(&mut open, ""); + open.push(0); + amqp_write_method(&mut s, 0, 10, 40, &open).unwrap(); + assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 0, 10, 41); + + let mut channel_open = Vec::new(); + amqp_push_longstr(&mut channel_open, ""); + amqp_write_method(&mut s, 1, 20, 10, &channel_open).unwrap(); + assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 20, 11); + + let mut declare = Vec::new(); + amqp_push_u16(&mut declare, 0); + amqp_push_shortstr(&mut declare, "work"); + declare.push(0); + amqp_push_table_empty(&mut declare); + amqp_write_method(&mut s, 1, 50, 10, &declare).unwrap(); + assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 50, 11); + + let mut publish = Vec::new(); + amqp_push_u16(&mut publish, 0); + amqp_push_shortstr(&mut publish, ""); + amqp_push_shortstr(&mut publish, "work"); + publish.push(0); + amqp_write_method(&mut s, 1, 60, 40, &publish).unwrap(); + amqp_write_content(&mut s, 1, b"NYX\tPAYLOAD").unwrap(); + + let mut get = Vec::new(); + amqp_push_u16(&mut get, 0); + amqp_push_shortstr(&mut get, "work"); + get.push(0); + amqp_write_method(&mut s, 1, 60, 70, &get).unwrap(); + let get_ok = amqp_read_frame(&mut reader).unwrap(); + assert_amqp_method_ref(&get_ok, 1, 60, 71); + let mut idx = 4; + let delivery_tag = amqp_take_u64(&get_ok.payload, &mut idx).unwrap(); + let header = amqp_read_frame(&mut reader).unwrap(); + assert_eq!(header.frame_type, AMQP_FRAME_HEADER); + let body = amqp_read_frame(&mut reader).unwrap(); + assert_eq!(body.frame_type, AMQP_FRAME_BODY); + assert_eq!(body.payload, b"NYX\tPAYLOAD"); + + let mut ack = Vec::new(); + amqp_push_u64(&mut ack, delivery_tag); + ack.push(0); + amqp_write_method(&mut s, 1, 60, 80, &ack).unwrap(); + std::thread::sleep(Duration::from_millis(25)); + + let events = stub.drain_events(); + let actions: Vec<&str> = events + .iter() + .map(|ev| ev.detail.get("action").unwrap().as_str()) + .collect(); + assert_eq!(actions, vec!["publish", "deliver", "ack"]); + assert_eq!(events[0].detail.get("destination").unwrap(), "work"); + assert_eq!(events[1].detail.get("payload").unwrap(), "NYX\tPAYLOAD"); + assert_eq!( + events[2].detail.get("payload").unwrap(), + &delivery_tag.to_string() + ); + } + + #[test] + fn rabbit_amqp_basic_consume_receives_published_messages() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Rabbit, dir.path()).unwrap(); + let endpoint = stub.endpoint(); + if endpoint == "loopback://rabbit" { + return; + } + let port: u16 = endpoint + .trim_start_matches("amqp://127.0.0.1:") + .split('/') + .next() + .unwrap() + .parse() + .unwrap(); + let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap(); + let mut reader = BufReader::new(s.try_clone().unwrap()); + amqp_test_open_channel(&mut s, &mut reader); + + let mut declare = Vec::new(); + amqp_push_u16(&mut declare, 0); + amqp_push_shortstr(&mut declare, "work"); + declare.push(0); + amqp_push_table_empty(&mut declare); + amqp_write_method(&mut s, 1, 50, 10, &declare).unwrap(); + assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 50, 11); + + let mut consume = Vec::new(); + amqp_push_u16(&mut consume, 0); + amqp_push_shortstr(&mut consume, "work"); + amqp_push_shortstr(&mut consume, "ctag"); + consume.push(0); + amqp_push_table_empty(&mut consume); + amqp_write_method(&mut s, 1, 60, 20, &consume).unwrap(); + assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 60, 21); + + let mut publish = Vec::new(); + amqp_push_u16(&mut publish, 0); + amqp_push_shortstr(&mut publish, ""); + amqp_push_shortstr(&mut publish, "work"); + publish.push(0); + amqp_write_method(&mut s, 1, 60, 40, &publish).unwrap(); + amqp_write_content(&mut s, 1, b"async payload").unwrap(); + + let deliver = amqp_read_frame(&mut reader).unwrap(); + assert_amqp_method_ref(&deliver, 1, 60, 60); + let mut idx = 4; + assert_eq!( + amqp_take_shortstr(&deliver.payload, &mut idx).unwrap(), + "ctag" + ); + let delivery_tag = amqp_take_u64(&deliver.payload, &mut idx).unwrap(); + let header = amqp_read_frame(&mut reader).unwrap(); + assert_eq!(header.frame_type, AMQP_FRAME_HEADER); + let body = amqp_read_frame(&mut reader).unwrap(); + assert_eq!(body.frame_type, AMQP_FRAME_BODY); + assert_eq!(body.payload, b"async payload"); + + let mut ack = Vec::new(); + amqp_push_u64(&mut ack, delivery_tag); + ack.push(0); + amqp_write_method(&mut s, 1, 60, 80, &ack).unwrap(); + std::thread::sleep(Duration::from_millis(25)); + + let events = stub.drain_events(); + let actions: Vec<&str> = events + .iter() + .map(|ev| ev.detail.get("action").unwrap().as_str()) + .collect(); + assert_eq!(actions, vec!["publish", "deliver", "ack"]); + assert_eq!(events[1].detail.get("payload").unwrap(), "async payload"); + } + #[test] fn nats_protocol_server_records_publish_deliver() { let dir = TempDir::new().unwrap(); @@ -1495,6 +2352,47 @@ mod tests { out } + fn assert_amqp_method(frame: AmqpFrame, channel: u16, class_id: u16, method_id: u16) { + assert_amqp_method_ref(&frame, channel, class_id, method_id); + } + + fn assert_amqp_method_ref(frame: &AmqpFrame, channel: u16, class_id: u16, method_id: u16) { + assert_eq!(frame.frame_type, AMQP_FRAME_METHOD); + assert_eq!(frame.channel, channel); + assert_eq!(amqp_method_id(&frame.payload), Some((class_id, method_id))); + } + + fn amqp_test_open_channel(s: &mut TcpStream, reader: &mut BufReader) { + s.write_all(b"AMQP\0\0\x09\x01").unwrap(); + assert_amqp_method(amqp_read_frame(reader).unwrap(), 0, 10, 10); + + let mut start_ok = Vec::new(); + amqp_push_table_empty(&mut start_ok); + amqp_push_shortstr(&mut start_ok, "PLAIN"); + amqp_push_longstr(&mut start_ok, "\0guest\0guest"); + amqp_push_shortstr(&mut start_ok, "en_US"); + amqp_write_method(s, 0, 10, 11, &start_ok).unwrap(); + assert_amqp_method(amqp_read_frame(reader).unwrap(), 0, 10, 30); + + let mut tune_ok = Vec::new(); + amqp_push_u16(&mut tune_ok, 2047); + amqp_push_u32(&mut tune_ok, 131_072); + amqp_push_u16(&mut tune_ok, 0); + amqp_write_method(s, 0, 10, 31, &tune_ok).unwrap(); + + let mut open = Vec::new(); + amqp_push_shortstr(&mut open, "/"); + amqp_push_shortstr(&mut open, ""); + open.push(0); + amqp_write_method(s, 0, 10, 40, &open).unwrap(); + assert_amqp_method(amqp_read_frame(reader).unwrap(), 0, 10, 41); + + let mut channel_open = Vec::new(); + amqp_push_longstr(&mut channel_open, ""); + amqp_write_method(s, 1, 20, 10, &channel_open).unwrap(); + assert_amqp_method(amqp_read_frame(reader).unwrap(), 1, 20, 11); + } + fn form_escape(input: &str) -> String { let mut out = String::new(); for b in input.bytes() {