From 030b0548433ed167b2ea0de4ea70d4616ec44a33 Mon Sep 17 00:00:00 2001 From: elipeter Date: Wed, 27 May 2026 13:42:23 -0500 Subject: [PATCH] refactor(dynamic): extend Rabbit AMQP protocol emulator with exchange/queue management, publisher confirms, nack/reject handling, and enhanced test coverage --- src/dynamic/lang/js_shared.rs | 52 ++++ src/dynamic/stubs/broker.rs | 497 +++++++++++++++++++++++++++++--- tests/message_handler_corpus.rs | 130 +++++++++ tests/phase21_corpus.rs | 25 ++ 4 files changed, 667 insertions(+), 37 deletions(-) diff --git a/src/dynamic/lang/js_shared.rs b/src/dynamic/lang/js_shared.rs index 79c4b66d..f7dab2f5 100644 --- a/src/dynamic/lang/js_shared.rs +++ b/src/dynamic/lang/js_shared.rs @@ -1165,6 +1165,57 @@ if (_h == null) {{ process.stderr.write('NYX_RESOLVER_NOT_FOUND: ' + {handler:?} + '\n'); process.exit(78); }} +async function _nyxTryApolloServer(typeName, fieldName, resolver) {{ + let ApolloServer; + let needsStart = true; + try {{ + ApolloServer = require('@apollo/server').ApolloServer; + }} catch (_) {{ + try {{ + ApolloServer = require('apollo-server').ApolloServer; + needsStart = false; + }} catch (_) {{ + return false; + }} + }} + if (typeof ApolloServer !== 'function') return false; + const safeField = /^[A-Za-z_][A-Za-z0-9_]*$/.test(fieldName) ? fieldName : 'nyxField'; + const typeDefs = 'type Query {{ ' + safeField + '(id: String, input: String): String }}'; + const resolvers = {{ + Query: {{}}, + }}; + resolvers.Query[safeField] = async function (parent, args, context, info) {{ + const value = await Promise.resolve(resolver( + parent, + Object.assign({{ id: payload, input: payload, value: payload }}, args || {{}}), + context || {{}}, + info || {{ fieldName: safeField, parentType: typeName }} + )); + return value == null ? null : String(value); + }}; + let server; + try {{ + server = new ApolloServer({{ typeDefs, resolvers }}); + if (needsStart && typeof server.start === 'function') await server.start(); + const raw = await server.executeOperation({{ + query: 'query($value: String) {{ ' + safeField + '(id: $value, input: $value) }}', + variables: {{ value: payload }}, + }}); + const result = raw && raw.body && raw.body.kind === 'single' ? raw.body.singleResult : raw; + if (result && result.errors && result.errors.length) return false; + if (result && result.data && result.data[safeField] != null) {{ + process.stdout.write(String(result.data[safeField]) + '\n'); + }} + return true; + }} catch (e) {{ + process.stderr.write('NYX_APOLLO_FALLBACK: ' + (e && e.message ? e.message : String(e)) + '\n'); + return false; + }} finally {{ + if (server && typeof server.stop === 'function') {{ + try {{ await server.stop(); }} catch (_) {{}} + }} + }} +}} async function _nyxTryGraphqlJs(typeName, fieldName, resolver) {{ let graphql; let buildSchema; @@ -1206,6 +1257,7 @@ async function _nyxTryGraphqlJs(typeName, fieldName, resolver) {{ }} (async () => {{ try {{ + if (await _nyxTryApolloServer({type_name:?}, {field:?}, _h)) return; if (await _nyxTryGraphqlJs({type_name:?}, {field:?}, _h)) return; // Apollo resolver shape: (parent, args, context, info). const _info = {{ fieldName: {field:?}, parentType: {type_name:?} }}; diff --git a/src/dynamic/stubs/broker.rs b/src/dynamic/stubs/broker.rs index a11080b9..495ab120 100644 --- a/src/dynamic/stubs/broker.rs +++ b/src/dynamic/stubs/broker.rs @@ -8,6 +8,14 @@ //! delivery API used by today's message-handler harnesses; this //! provider is the shared recording and routing surface those snippets //! can use. +//! +//! The Rabbit provider intentionally implements a bounded AMQP 0-9-1 +//! contract rather than a full broker: connection/channel open, exchange +//! declare, queue declare/bind/delete, basic publish/get/consume/deliver, +//! qos, ack/nack/reject with requeue, cancel, publisher confirms, close, +//! and heartbeats. It does not emulate broker policies such as TLS, +//! federation, DLX, permissions, or exchange-type routing beyond direct +//! queue bindings. use super::{StubEvent, StubKind, StubProvider, monotonic_ns}; use std::collections::{BTreeMap, VecDeque}; @@ -715,6 +723,7 @@ struct RabbitAmqpState { queues: BTreeMap>, inflight: BTreeMap, consumers: BTreeMap>, + bindings: BTreeMap<(String, String), Vec>, } #[derive(Debug, Clone)] @@ -796,6 +805,8 @@ fn handle_rabbit_amqp_connection( } let mut owned_consumer_tags = Vec::new(); + let mut confirms_enabled = false; + let mut next_publish_tag = 0_u64; loop { let Some(frame) = amqp_read_frame(&mut reader) else { break; @@ -850,6 +861,17 @@ fn handle_rabbit_amqp_connection( break; } } + // exchange.declare + (40, 10) => { + if let Some(exchange) = amqp_exchange_declare_name(&frame.payload) + && let Ok(mut guard) = state.lock() + { + guard.bindings.entry((exchange, String::new())).or_default(); + } + if amqp_write_method(&mut writer, frame.channel, 40, 11, &[]).is_err() { + break; + } + } // basic.consume (60, 20) => { let Some((queue, requested_tag, no_ack)) = amqp_basic_consume_args(&frame.payload) @@ -890,7 +912,7 @@ fn handle_rabbit_amqp_connection( // basic.cancel (60, 30) => { if let Some(consumer_tag) = amqp_basic_cancel_tag(&frame.payload) { - rabbit_amqp_remove_consumers(&state, &[consumer_tag.clone()]); + rabbit_amqp_remove_consumers(&state, std::slice::from_ref(&consumer_tag)); if amqp_write_basic_cancel_ok(&mut writer, frame.channel, &consumer_tag) .is_err() { @@ -914,29 +936,68 @@ fn handle_rabbit_amqp_connection( break; } } + // queue.bind + (50, 20) => { + if let Some((queue, exchange, routing_key)) = amqp_queue_bind_args(&frame.payload) + && let Ok(mut guard) = state.lock() + { + guard + .bindings + .entry((exchange, routing_key)) + .or_default() + .push(queue); + } + if amqp_write_method(&mut writer, frame.channel, 50, 21, &[]).is_err() { + break; + } + } + // queue.delete + (50, 40) => { + let queue = amqp_queue_delete_name(&frame.payload).unwrap_or_default(); + let removed = if let Ok(mut guard) = state.lock() { + guard.queues.remove(&queue).map(|q| q.len()).unwrap_or(0) as u32 + } else { + 0 + }; + if amqp_write_queue_delete_ok(&mut writer, frame.channel, removed).is_err() { + break; + } + } // basic.publish (60, 40) => { - let routing_key = amqp_basic_publish_routing_key(&frame.payload) - .filter(|q| !q.is_empty()) - .unwrap_or_else(|| "default".to_owned()); + let Some((exchange, routing_key)) = amqp_basic_publish_args(&frame.payload) else { + continue; + }; + let routing_key = if routing_key.is_empty() { + "default".to_owned() + } else { + routing_key + }; let Some(body) = amqp_read_content_body(&mut reader, frame.channel) else { break; }; let payload = String::from_utf8_lossy(&body).into_owned(); - if !rabbit_amqp_deliver_to_consumer( - &state, - log_path, - &routing_key, - payload.as_bytes(), - ) && let Ok(mut guard) = state.lock() - { - guard - .queues - .entry(routing_key.clone()) - .or_default() - .push_back(payload.clone()); + let destinations = + rabbit_amqp_publish_destinations(&state, &exchange, &routing_key); + for destination in &destinations { + if !rabbit_amqp_deliver_to_consumer( + &state, + log_path, + destination, + payload.as_bytes(), + ) { + rabbit_amqp_enqueue(&state, destination, &payload); + } } let _ = append_broker_event(log_path, "publish", &routing_key, &payload); + if confirms_enabled { + next_publish_tag = next_publish_tag.saturating_add(1); + if amqp_write_basic_ack(&mut writer, frame.channel, next_publish_tag, false) + .is_err() + { + break; + } + } } // basic.get (60, 70) => { @@ -980,28 +1041,40 @@ fn handle_rabbit_amqp_connection( let Some((delivery_tag, multiple)) = amqp_basic_ack_tag(&frame.payload) else { continue; }; - let mut acked = Vec::new(); - if let Ok(mut guard) = state.lock() { - if multiple { - let tags: Vec = guard - .inflight - .keys() - .copied() - .filter(|tag| *tag <= delivery_tag) - .collect(); - for tag in tags { - if let Some((queue, _payload)) = guard.inflight.remove(&tag) { - acked.push((queue, tag)); - } - } - } else if let Some((queue, _payload)) = guard.inflight.remove(&delivery_tag) { - acked.push((queue, delivery_tag)); - } - } - for (queue, tag) in acked { + for (queue, tag) in rabbit_amqp_ack_deliveries(&state, delivery_tag, multiple) { let _ = append_broker_event(log_path, "ack", &queue, &tag.to_string()); } } + // basic.reject + (60, 90) => { + let Some((delivery_tag, requeue)) = amqp_basic_reject_args(&frame.payload) else { + continue; + }; + for (queue, tag) in + rabbit_amqp_nack_deliveries(&state, delivery_tag, false, requeue) + { + let _ = append_broker_event(log_path, "nack", &queue, &tag.to_string()); + } + } + // basic.nack + (60, 120) => { + let Some((delivery_tag, multiple, requeue)) = amqp_basic_nack_args(&frame.payload) + else { + continue; + }; + for (queue, tag) in + rabbit_amqp_nack_deliveries(&state, delivery_tag, multiple, requeue) + { + let _ = append_broker_event(log_path, "nack", &queue, &tag.to_string()); + } + } + // confirm.select + (85, 10) => { + confirms_enabled = true; + if amqp_write_method(&mut writer, frame.channel, 85, 11, &[]).is_err() { + break; + } + } _ => {} } } @@ -1068,6 +1141,28 @@ fn amqp_write_queue_declare_ok( amqp_write_method(writer, channel, 50, 11, &args) } +fn amqp_write_queue_delete_ok( + writer: &mut TcpStream, + channel: u16, + message_count: u32, +) -> std::io::Result<()> { + let mut args = Vec::new(); + amqp_push_u32(&mut args, message_count); + amqp_write_method(writer, channel, 50, 41, &args) +} + +fn amqp_write_basic_ack( + writer: &mut TcpStream, + channel: u16, + delivery_tag: u64, + multiple: bool, +) -> std::io::Result<()> { + let mut args = Vec::new(); + amqp_push_u64(&mut args, delivery_tag); + args.push(u8::from(multiple)); + amqp_write_method(writer, channel, 60, 80, &args) +} + fn amqp_write_basic_get_ok( writer: &mut TcpStream, channel: u16, @@ -1216,13 +1311,35 @@ fn amqp_queue_declare_name(payload: &[u8]) -> Option { amqp_take_shortstr(payload, &mut idx) } -fn amqp_basic_publish_routing_key(payload: &[u8]) -> Option { +fn amqp_exchange_declare_name(payload: &[u8]) -> Option { let mut idx = 4; amqp_take_u16(payload, &mut idx)?; - let _exchange = amqp_take_shortstr(payload, &mut idx)?; amqp_take_shortstr(payload, &mut idx) } +fn amqp_queue_bind_args(payload: &[u8]) -> Option<(String, String, String)> { + let mut idx = 4; + amqp_take_u16(payload, &mut idx)?; + let queue = amqp_take_shortstr(payload, &mut idx)?; + let exchange = amqp_take_shortstr(payload, &mut idx)?; + let routing_key = amqp_take_shortstr(payload, &mut idx)?; + Some((queue, exchange, routing_key)) +} + +fn amqp_queue_delete_name(payload: &[u8]) -> Option { + let mut idx = 4; + amqp_take_u16(payload, &mut idx)?; + amqp_take_shortstr(payload, &mut idx) +} + +fn amqp_basic_publish_args(payload: &[u8]) -> Option<(String, String)> { + let mut idx = 4; + amqp_take_u16(payload, &mut idx)?; + let exchange = amqp_take_shortstr(payload, &mut idx)?; + let routing_key = amqp_take_shortstr(payload, &mut idx)?; + Some((exchange, routing_key)) +} + fn amqp_basic_get_queue(payload: &[u8]) -> Option { let mut idx = 4; amqp_take_u16(payload, &mut idx)?; @@ -1250,6 +1367,20 @@ fn amqp_basic_ack_tag(payload: &[u8]) -> Option<(u64, bool)> { Some((tag, bits & 1 != 0)) } +fn amqp_basic_reject_args(payload: &[u8]) -> Option<(u64, bool)> { + let mut idx = 4; + let tag = amqp_take_u64(payload, &mut idx)?; + let bits = payload.get(idx).copied().unwrap_or(0); + Some((tag, bits & 1 != 0)) +} + +fn amqp_basic_nack_args(payload: &[u8]) -> Option<(u64, bool, bool)> { + let mut idx = 4; + let tag = amqp_take_u64(payload, &mut idx)?; + let bits = payload.get(idx).copied().unwrap_or(0); + Some((tag, bits & 1 != 0, bits & 0b10 != 0)) +} + fn amqp_take_u16(payload: &[u8], idx: &mut usize) -> Option { let end = *idx + 2; let bytes: [u8; 2] = payload.get(*idx..end)?.try_into().ok()?; @@ -1353,6 +1484,102 @@ fn rabbit_amqp_deliver_to_consumer( } } +fn rabbit_amqp_publish_destinations( + state: &Arc>, + exchange: &str, + routing_key: &str, +) -> Vec { + if exchange.is_empty() { + return vec![routing_key.to_owned()]; + } + let mut out = state + .lock() + .ok() + .and_then(|guard| { + guard + .bindings + .get(&(exchange.to_owned(), routing_key.to_owned())) + .cloned() + }) + .unwrap_or_default(); + if out.is_empty() { + out.push(routing_key.to_owned()); + } + out.sort(); + out.dedup(); + out +} + +fn rabbit_amqp_enqueue(state: &Arc>, queue: &str, payload: &str) { + if let Ok(mut guard) = state.lock() { + guard + .queues + .entry(queue.to_owned()) + .or_default() + .push_back(payload.to_owned()); + } +} + +fn rabbit_amqp_ack_deliveries( + state: &Arc>, + delivery_tag: u64, + multiple: bool, +) -> Vec<(String, u64)> { + let mut acked = Vec::new(); + if let Ok(mut guard) = state.lock() { + if multiple { + let tags: Vec = guard + .inflight + .keys() + .copied() + .filter(|tag| *tag <= delivery_tag) + .collect(); + for tag in tags { + if let Some((queue, _payload)) = guard.inflight.remove(&tag) { + acked.push((queue, tag)); + } + } + } else if let Some((queue, _payload)) = guard.inflight.remove(&delivery_tag) { + acked.push((queue, delivery_tag)); + } + } + acked +} + +fn rabbit_amqp_nack_deliveries( + state: &Arc>, + delivery_tag: u64, + multiple: bool, + requeue: bool, +) -> Vec<(String, u64)> { + let mut nacked = Vec::new(); + if let Ok(mut guard) = state.lock() { + let tags: Vec = if multiple { + guard + .inflight + .keys() + .copied() + .filter(|tag| *tag <= delivery_tag) + .collect() + } else { + vec![delivery_tag] + }; + for tag in tags { + if let Some((queue, payload)) = guard.inflight.remove(&tag) { + if requeue { + guard + .queues + .entry(queue.clone()) + .or_default() + .push_front(payload); + } + nacked.push((queue, tag)); + } + } + } + nacked +} + fn rabbit_amqp_remove_consumers(state: &Arc>, consumer_tags: &[String]) { if consumer_tags.is_empty() { return; @@ -2234,6 +2461,202 @@ mod tests { assert_eq!(events[1].detail.get("payload").unwrap(), "async payload"); } + #[test] + fn rabbit_amqp_exchange_bind_and_publisher_confirm_route_to_queue() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Rabbit, dir.path()).unwrap(); + let endpoint = stub.endpoint(); + if endpoint == "loopback://rabbit" { + return; + } + let port: u16 = endpoint + .trim_start_matches("amqp://127.0.0.1:") + .split('/') + .next() + .unwrap() + .parse() + .unwrap(); + let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap(); + let mut reader = BufReader::new(s.try_clone().unwrap()); + amqp_test_open_channel(&mut s, &mut reader); + + let mut exchange = Vec::new(); + amqp_push_u16(&mut exchange, 0); + amqp_push_shortstr(&mut exchange, "events"); + amqp_push_shortstr(&mut exchange, "direct"); + exchange.push(0); + amqp_push_table_empty(&mut exchange); + amqp_write_method(&mut s, 1, 40, 10, &exchange).unwrap(); + assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 40, 11); + + let mut declare = Vec::new(); + amqp_push_u16(&mut declare, 0); + amqp_push_shortstr(&mut declare, "work"); + declare.push(0); + amqp_push_table_empty(&mut declare); + amqp_write_method(&mut s, 1, 50, 10, &declare).unwrap(); + assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 50, 11); + + let mut bind = Vec::new(); + amqp_push_u16(&mut bind, 0); + amqp_push_shortstr(&mut bind, "work"); + amqp_push_shortstr(&mut bind, "events"); + amqp_push_shortstr(&mut bind, "orders.created"); + bind.push(0); + amqp_push_table_empty(&mut bind); + amqp_write_method(&mut s, 1, 50, 20, &bind).unwrap(); + assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 50, 21); + + amqp_write_method(&mut s, 1, 85, 10, &[0]).unwrap(); + assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 85, 11); + + let mut publish = Vec::new(); + amqp_push_u16(&mut publish, 0); + amqp_push_shortstr(&mut publish, "events"); + amqp_push_shortstr(&mut publish, "orders.created"); + publish.push(0); + amqp_write_method(&mut s, 1, 60, 40, &publish).unwrap(); + amqp_write_content(&mut s, 1, b"exchange payload").unwrap(); + assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 60, 80); + + let mut get = Vec::new(); + amqp_push_u16(&mut get, 0); + amqp_push_shortstr(&mut get, "work"); + get.push(0); + amqp_write_method(&mut s, 1, 60, 70, &get).unwrap(); + let get_ok = amqp_read_frame(&mut reader).unwrap(); + assert_amqp_method_ref(&get_ok, 1, 60, 71); + let mut idx = 4; + let delivery_tag = amqp_take_u64(&get_ok.payload, &mut idx).unwrap(); + let header = amqp_read_frame(&mut reader).unwrap(); + assert_eq!(header.frame_type, AMQP_FRAME_HEADER); + let body = amqp_read_frame(&mut reader).unwrap(); + assert_eq!(body.frame_type, AMQP_FRAME_BODY); + assert_eq!(body.payload, b"exchange payload"); + + let mut ack = Vec::new(); + amqp_push_u64(&mut ack, delivery_tag); + ack.push(0); + amqp_write_method(&mut s, 1, 60, 80, &ack).unwrap(); + std::thread::sleep(Duration::from_millis(25)); + + let events = stub.drain_events(); + let actions: Vec<&str> = events + .iter() + .map(|ev| ev.detail.get("action").unwrap().as_str()) + .collect(); + assert_eq!(actions, vec!["publish", "deliver", "ack"]); + assert_eq!( + events[0].detail.get("destination").unwrap(), + "orders.created" + ); + assert_eq!(events[1].detail.get("destination").unwrap(), "work"); + assert_eq!(events[1].detail.get("payload").unwrap(), "exchange payload"); + } + + #[test] + fn rabbit_amqp_basic_nack_requeues_delivery() { + let dir = TempDir::new().unwrap(); + let stub = BrokerStub::start(StubKind::Rabbit, dir.path()).unwrap(); + let endpoint = stub.endpoint(); + if endpoint == "loopback://rabbit" { + return; + } + let port: u16 = endpoint + .trim_start_matches("amqp://127.0.0.1:") + .split('/') + .next() + .unwrap() + .parse() + .unwrap(); + let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap(); + let mut reader = BufReader::new(s.try_clone().unwrap()); + amqp_test_open_channel(&mut s, &mut reader); + + let mut declare = Vec::new(); + amqp_push_u16(&mut declare, 0); + amqp_push_shortstr(&mut declare, "work"); + declare.push(0); + amqp_push_table_empty(&mut declare); + amqp_write_method(&mut s, 1, 50, 10, &declare).unwrap(); + assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 50, 11); + + let mut publish = Vec::new(); + amqp_push_u16(&mut publish, 0); + amqp_push_shortstr(&mut publish, ""); + amqp_push_shortstr(&mut publish, "work"); + publish.push(0); + amqp_write_method(&mut s, 1, 60, 40, &publish).unwrap(); + amqp_write_content(&mut s, 1, b"retry payload").unwrap(); + + let mut get = Vec::new(); + amqp_push_u16(&mut get, 0); + amqp_push_shortstr(&mut get, "work"); + get.push(0); + amqp_write_method(&mut s, 1, 60, 70, &get).unwrap(); + let first_get_ok = amqp_read_frame(&mut reader).unwrap(); + assert_amqp_method_ref(&first_get_ok, 1, 60, 71); + let mut idx = 4; + let first_delivery_tag = amqp_take_u64(&first_get_ok.payload, &mut idx).unwrap(); + assert_eq!( + amqp_read_frame(&mut reader).unwrap().frame_type, + AMQP_FRAME_HEADER + ); + assert_eq!( + amqp_read_frame(&mut reader).unwrap().payload, + b"retry payload" + ); + + let mut nack = Vec::new(); + amqp_push_u64(&mut nack, first_delivery_tag); + nack.push(0b10); + amqp_write_method(&mut s, 1, 60, 120, &nack).unwrap(); + + let mut get_again = Vec::new(); + amqp_push_u16(&mut get_again, 0); + amqp_push_shortstr(&mut get_again, "work"); + get_again.push(0); + amqp_write_method(&mut s, 1, 60, 70, &get_again).unwrap(); + let second_get_ok = amqp_read_frame(&mut reader).unwrap(); + assert_amqp_method_ref(&second_get_ok, 1, 60, 71); + let mut idx = 4; + let second_delivery_tag = amqp_take_u64(&second_get_ok.payload, &mut idx).unwrap(); + assert_ne!(first_delivery_tag, second_delivery_tag); + assert_eq!( + amqp_read_frame(&mut reader).unwrap().frame_type, + AMQP_FRAME_HEADER + ); + assert_eq!( + amqp_read_frame(&mut reader).unwrap().payload, + b"retry payload" + ); + + let mut ack = Vec::new(); + amqp_push_u64(&mut ack, second_delivery_tag); + ack.push(0); + amqp_write_method(&mut s, 1, 60, 80, &ack).unwrap(); + std::thread::sleep(Duration::from_millis(25)); + + let events = stub.drain_events(); + let actions: Vec<&str> = events + .iter() + .map(|ev| ev.detail.get("action").unwrap().as_str()) + .collect(); + assert_eq!( + actions, + vec!["publish", "deliver", "nack", "deliver", "ack"] + ); + assert_eq!( + events[2].detail.get("payload").unwrap(), + &first_delivery_tag.to_string() + ); + assert_eq!(events[3].detail.get("payload").unwrap(), "retry payload"); + assert_eq!( + events[4].detail.get("payload").unwrap(), + &second_delivery_tag.to_string() + ); + } + #[test] fn nats_protocol_server_records_publish_deliver() { let dir = TempDir::new().unwrap(); diff --git a/tests/message_handler_corpus.rs b/tests/message_handler_corpus.rs index 3c5f4dfc..bb38478d 100644 --- a/tests/message_handler_corpus.rs +++ b/tests/message_handler_corpus.rs @@ -101,6 +101,13 @@ fn make_spec_with_adapter( spec } +fn assert_extra_file_contains(files: &[(String, String)], path: &str, needle: &str, context: &str) { + assert!( + files.iter().any(|(p, c)| p == path && c.contains(needle)), + "{context} must stage {path} containing {needle:?}; got {files:?}" + ); +} + // ── Supported-set assertions ────────────────────────────────────────────────── #[test] @@ -354,6 +361,129 @@ fn message_handler_java_rabbit_tries_real_client_before_fallbacks() { ); } +#[test] +fn message_handler_real_client_runtime_deps_are_staged_from_adapter() { + let py_kafka = lang::emit(&make_spec_with_adapter( + Lang::Python, + "orders", + "handler", + entry_file("kafka_python"), + "kafka-python", + )) + .expect("emit kafka-python"); + assert_extra_file_contains( + &py_kafka.extra_files, + "requirements.txt", + "kafka-python", + "kafka-python", + ); + + let py_pubsub = lang::emit(&make_spec_with_adapter( + Lang::Python, + "projects/p/subscriptions/s", + "callback", + entry_file("pubsub_python"), + "pubsub-python", + )) + .expect("emit pubsub-python"); + assert_extra_file_contains( + &py_pubsub.extra_files, + "requirements.txt", + "google-cloud-pubsub", + "pubsub-python", + ); + + let py_rabbit = lang::emit(&make_spec_with_adapter( + Lang::Python, + "work", + "on_message", + entry_file("rabbit_python"), + "rabbit-python", + )) + .expect("emit rabbit-python"); + assert_extra_file_contains( + &py_rabbit.extra_files, + "requirements.txt", + "pika", + "rabbit-python", + ); + + let node_sqs = lang::emit(&make_spec_with_adapter( + Lang::JavaScript, + "jobs", + "handler", + entry_file("sqs_node"), + "sqs-node", + )) + .expect("emit sqs-node"); + assert_extra_file_contains( + &node_sqs.extra_files, + "package.json", + "@aws-sdk/client-sqs", + "sqs-node", + ); + + let java_kafka = lang::emit(&make_spec_with_adapter( + Lang::Java, + "orders", + "onMessage", + entry_file("kafka_java"), + "kafka-java", + )) + .expect("emit kafka-java"); + assert_extra_file_contains( + &java_kafka.extra_files, + "pom.xml", + "kafka-clients", + "kafka-java", + ); + + let java_rabbit = lang::emit(&make_spec_with_adapter( + Lang::Java, + "work", + "onMessage", + entry_file("rabbit_java"), + "rabbit-java", + )) + .expect("emit rabbit-java"); + assert_extra_file_contains( + &java_rabbit.extra_files, + "pom.xml", + "amqp-client", + "rabbit-java", + ); + + let go_pubsub = lang::emit(&make_spec_with_adapter( + Lang::Go, + "my-sub", + "OnMessage", + entry_file("pubsub_go"), + "pubsub-go", + )) + .expect("emit pubsub-go"); + assert_extra_file_contains( + &go_pubsub.extra_files, + "go.mod", + "cloud.google.com/go/pubsub", + "pubsub-go", + ); + + let go_nats = lang::emit(&make_spec_with_adapter( + Lang::Go, + "events", + "OnMessage", + entry_file("nats_go"), + "nats-go", + )) + .expect("emit nats-go"); + assert_extra_file_contains( + &go_nats.extra_files, + "go.mod", + "github.com/nats-io/nats.go", + "nats-go", + ); +} + #[test] fn message_handler_go_pubsub_tries_real_client_before_fallbacks() { let spec = make_spec_with_adapter( diff --git a/tests/phase21_corpus.rs b/tests/phase21_corpus.rs index b8ab6e4f..97fcd8ac 100644 --- a/tests/phase21_corpus.rs +++ b/tests/phase21_corpus.rs @@ -752,8 +752,33 @@ fn graphql_resolver_js_harness_carries_sentinel_and_field() { let h = lang::emit(&spec).expect("emit ok"); assert!(h.source.contains("__NYX_GRAPHQL_RESOLVER__")); assert!(h.source.contains("\"resolveUser\"")); + assert!(h.source.contains("_nyxTryApolloServer")); + assert!(h.source.contains("require('@apollo/server')")); assert!(h.source.contains("_nyxTryGraphqlJs")); assert!(h.source.contains("require('graphql')")); + assert!( + h.source.find("_nyxTryApolloServer").unwrap() < h.source.find("_nyxTryGraphqlJs").unwrap(), + "Apollo Server should run before the GraphQL.js fallback" + ); +} + +#[test] +fn graphql_resolver_js_apollo_stages_runtime_deps() { + let spec = framework_bound_spec( + Lang::JavaScript, + EvEntryKind::GraphQLResolver { + type_name: "Query".into(), + field: "user".into(), + }, + "resolveUser", + "tests/dynamic_fixtures/graphql_resolver/apollo/vuln.js", + "graphql-apollo", + ); + let h = lang::emit(&spec).expect("emit ok"); + let package = extra_file_content(&h.extra_files, "package.json"); + assert!(package.contains("\"@apollo/server\"")); + assert!(package.contains("\"apollo-server\"")); + assert!(package.contains("\"graphql\"")); } #[test]