mirror of
https://github.com/elicpeter/nyx.git
synced 2026-06-30 20:39:39 +02:00
refactor(dynamic): extend Rabbit AMQP protocol emulator with exchange/queue management, publisher confirms, nack/reject handling, and enhanced test coverage
This commit is contained in:
parent
fd5e1f3e89
commit
030b054843
4 changed files with 667 additions and 37 deletions
|
|
@ -1165,6 +1165,57 @@ if (_h == null) {{
|
||||||
process.stderr.write('NYX_RESOLVER_NOT_FOUND: ' + {handler:?} + '\n');
|
process.stderr.write('NYX_RESOLVER_NOT_FOUND: ' + {handler:?} + '\n');
|
||||||
process.exit(78);
|
process.exit(78);
|
||||||
}}
|
}}
|
||||||
|
async function _nyxTryApolloServer(typeName, fieldName, resolver) {{
|
||||||
|
let ApolloServer;
|
||||||
|
let needsStart = true;
|
||||||
|
try {{
|
||||||
|
ApolloServer = require('@apollo/server').ApolloServer;
|
||||||
|
}} catch (_) {{
|
||||||
|
try {{
|
||||||
|
ApolloServer = require('apollo-server').ApolloServer;
|
||||||
|
needsStart = false;
|
||||||
|
}} catch (_) {{
|
||||||
|
return false;
|
||||||
|
}}
|
||||||
|
}}
|
||||||
|
if (typeof ApolloServer !== 'function') return false;
|
||||||
|
const safeField = /^[A-Za-z_][A-Za-z0-9_]*$/.test(fieldName) ? fieldName : 'nyxField';
|
||||||
|
const typeDefs = 'type Query {{ ' + safeField + '(id: String, input: String): String }}';
|
||||||
|
const resolvers = {{
|
||||||
|
Query: {{}},
|
||||||
|
}};
|
||||||
|
resolvers.Query[safeField] = async function (parent, args, context, info) {{
|
||||||
|
const value = await Promise.resolve(resolver(
|
||||||
|
parent,
|
||||||
|
Object.assign({{ id: payload, input: payload, value: payload }}, args || {{}}),
|
||||||
|
context || {{}},
|
||||||
|
info || {{ fieldName: safeField, parentType: typeName }}
|
||||||
|
));
|
||||||
|
return value == null ? null : String(value);
|
||||||
|
}};
|
||||||
|
let server;
|
||||||
|
try {{
|
||||||
|
server = new ApolloServer({{ typeDefs, resolvers }});
|
||||||
|
if (needsStart && typeof server.start === 'function') await server.start();
|
||||||
|
const raw = await server.executeOperation({{
|
||||||
|
query: 'query($value: String) {{ ' + safeField + '(id: $value, input: $value) }}',
|
||||||
|
variables: {{ value: payload }},
|
||||||
|
}});
|
||||||
|
const result = raw && raw.body && raw.body.kind === 'single' ? raw.body.singleResult : raw;
|
||||||
|
if (result && result.errors && result.errors.length) return false;
|
||||||
|
if (result && result.data && result.data[safeField] != null) {{
|
||||||
|
process.stdout.write(String(result.data[safeField]) + '\n');
|
||||||
|
}}
|
||||||
|
return true;
|
||||||
|
}} catch (e) {{
|
||||||
|
process.stderr.write('NYX_APOLLO_FALLBACK: ' + (e && e.message ? e.message : String(e)) + '\n');
|
||||||
|
return false;
|
||||||
|
}} finally {{
|
||||||
|
if (server && typeof server.stop === 'function') {{
|
||||||
|
try {{ await server.stop(); }} catch (_) {{}}
|
||||||
|
}}
|
||||||
|
}}
|
||||||
|
}}
|
||||||
async function _nyxTryGraphqlJs(typeName, fieldName, resolver) {{
|
async function _nyxTryGraphqlJs(typeName, fieldName, resolver) {{
|
||||||
let graphql;
|
let graphql;
|
||||||
let buildSchema;
|
let buildSchema;
|
||||||
|
|
@ -1206,6 +1257,7 @@ async function _nyxTryGraphqlJs(typeName, fieldName, resolver) {{
|
||||||
}}
|
}}
|
||||||
(async () => {{
|
(async () => {{
|
||||||
try {{
|
try {{
|
||||||
|
if (await _nyxTryApolloServer({type_name:?}, {field:?}, _h)) return;
|
||||||
if (await _nyxTryGraphqlJs({type_name:?}, {field:?}, _h)) return;
|
if (await _nyxTryGraphqlJs({type_name:?}, {field:?}, _h)) return;
|
||||||
// Apollo resolver shape: (parent, args, context, info).
|
// Apollo resolver shape: (parent, args, context, info).
|
||||||
const _info = {{ fieldName: {field:?}, parentType: {type_name:?} }};
|
const _info = {{ fieldName: {field:?}, parentType: {type_name:?} }};
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,14 @@
|
||||||
//! delivery API used by today's message-handler harnesses; this
|
//! delivery API used by today's message-handler harnesses; this
|
||||||
//! provider is the shared recording and routing surface those snippets
|
//! provider is the shared recording and routing surface those snippets
|
||||||
//! can use.
|
//! can use.
|
||||||
|
//!
|
||||||
|
//! The Rabbit provider intentionally implements a bounded AMQP 0-9-1
|
||||||
|
//! contract rather than a full broker: connection/channel open, exchange
|
||||||
|
//! declare, queue declare/bind/delete, basic publish/get/consume/deliver,
|
||||||
|
//! qos, ack/nack/reject with requeue, cancel, publisher confirms, close,
|
||||||
|
//! and heartbeats. It does not emulate broker policies such as TLS,
|
||||||
|
//! federation, DLX, permissions, or exchange-type routing beyond direct
|
||||||
|
//! queue bindings.
|
||||||
|
|
||||||
use super::{StubEvent, StubKind, StubProvider, monotonic_ns};
|
use super::{StubEvent, StubKind, StubProvider, monotonic_ns};
|
||||||
use std::collections::{BTreeMap, VecDeque};
|
use std::collections::{BTreeMap, VecDeque};
|
||||||
|
|
@ -715,6 +723,7 @@ struct RabbitAmqpState {
|
||||||
queues: BTreeMap<String, VecDeque<String>>,
|
queues: BTreeMap<String, VecDeque<String>>,
|
||||||
inflight: BTreeMap<u64, (String, String)>,
|
inflight: BTreeMap<u64, (String, String)>,
|
||||||
consumers: BTreeMap<String, Vec<RabbitAmqpConsumer>>,
|
consumers: BTreeMap<String, Vec<RabbitAmqpConsumer>>,
|
||||||
|
bindings: BTreeMap<(String, String), Vec<String>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
|
@ -796,6 +805,8 @@ fn handle_rabbit_amqp_connection(
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut owned_consumer_tags = Vec::new();
|
let mut owned_consumer_tags = Vec::new();
|
||||||
|
let mut confirms_enabled = false;
|
||||||
|
let mut next_publish_tag = 0_u64;
|
||||||
loop {
|
loop {
|
||||||
let Some(frame) = amqp_read_frame(&mut reader) else {
|
let Some(frame) = amqp_read_frame(&mut reader) else {
|
||||||
break;
|
break;
|
||||||
|
|
@ -850,6 +861,17 @@ fn handle_rabbit_amqp_connection(
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// exchange.declare
|
||||||
|
(40, 10) => {
|
||||||
|
if let Some(exchange) = amqp_exchange_declare_name(&frame.payload)
|
||||||
|
&& let Ok(mut guard) = state.lock()
|
||||||
|
{
|
||||||
|
guard.bindings.entry((exchange, String::new())).or_default();
|
||||||
|
}
|
||||||
|
if amqp_write_method(&mut writer, frame.channel, 40, 11, &[]).is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
// basic.consume
|
// basic.consume
|
||||||
(60, 20) => {
|
(60, 20) => {
|
||||||
let Some((queue, requested_tag, no_ack)) = amqp_basic_consume_args(&frame.payload)
|
let Some((queue, requested_tag, no_ack)) = amqp_basic_consume_args(&frame.payload)
|
||||||
|
|
@ -890,7 +912,7 @@ fn handle_rabbit_amqp_connection(
|
||||||
// basic.cancel
|
// basic.cancel
|
||||||
(60, 30) => {
|
(60, 30) => {
|
||||||
if let Some(consumer_tag) = amqp_basic_cancel_tag(&frame.payload) {
|
if let Some(consumer_tag) = amqp_basic_cancel_tag(&frame.payload) {
|
||||||
rabbit_amqp_remove_consumers(&state, &[consumer_tag.clone()]);
|
rabbit_amqp_remove_consumers(&state, std::slice::from_ref(&consumer_tag));
|
||||||
if amqp_write_basic_cancel_ok(&mut writer, frame.channel, &consumer_tag)
|
if amqp_write_basic_cancel_ok(&mut writer, frame.channel, &consumer_tag)
|
||||||
.is_err()
|
.is_err()
|
||||||
{
|
{
|
||||||
|
|
@ -914,29 +936,68 @@ fn handle_rabbit_amqp_connection(
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// queue.bind
|
||||||
|
(50, 20) => {
|
||||||
|
if let Some((queue, exchange, routing_key)) = amqp_queue_bind_args(&frame.payload)
|
||||||
|
&& let Ok(mut guard) = state.lock()
|
||||||
|
{
|
||||||
|
guard
|
||||||
|
.bindings
|
||||||
|
.entry((exchange, routing_key))
|
||||||
|
.or_default()
|
||||||
|
.push(queue);
|
||||||
|
}
|
||||||
|
if amqp_write_method(&mut writer, frame.channel, 50, 21, &[]).is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// queue.delete
|
||||||
|
(50, 40) => {
|
||||||
|
let queue = amqp_queue_delete_name(&frame.payload).unwrap_or_default();
|
||||||
|
let removed = if let Ok(mut guard) = state.lock() {
|
||||||
|
guard.queues.remove(&queue).map(|q| q.len()).unwrap_or(0) as u32
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
if amqp_write_queue_delete_ok(&mut writer, frame.channel, removed).is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
// basic.publish
|
// basic.publish
|
||||||
(60, 40) => {
|
(60, 40) => {
|
||||||
let routing_key = amqp_basic_publish_routing_key(&frame.payload)
|
let Some((exchange, routing_key)) = amqp_basic_publish_args(&frame.payload) else {
|
||||||
.filter(|q| !q.is_empty())
|
continue;
|
||||||
.unwrap_or_else(|| "default".to_owned());
|
};
|
||||||
|
let routing_key = if routing_key.is_empty() {
|
||||||
|
"default".to_owned()
|
||||||
|
} else {
|
||||||
|
routing_key
|
||||||
|
};
|
||||||
let Some(body) = amqp_read_content_body(&mut reader, frame.channel) else {
|
let Some(body) = amqp_read_content_body(&mut reader, frame.channel) else {
|
||||||
break;
|
break;
|
||||||
};
|
};
|
||||||
let payload = String::from_utf8_lossy(&body).into_owned();
|
let payload = String::from_utf8_lossy(&body).into_owned();
|
||||||
if !rabbit_amqp_deliver_to_consumer(
|
let destinations =
|
||||||
&state,
|
rabbit_amqp_publish_destinations(&state, &exchange, &routing_key);
|
||||||
log_path,
|
for destination in &destinations {
|
||||||
&routing_key,
|
if !rabbit_amqp_deliver_to_consumer(
|
||||||
payload.as_bytes(),
|
&state,
|
||||||
) && let Ok(mut guard) = state.lock()
|
log_path,
|
||||||
{
|
destination,
|
||||||
guard
|
payload.as_bytes(),
|
||||||
.queues
|
) {
|
||||||
.entry(routing_key.clone())
|
rabbit_amqp_enqueue(&state, destination, &payload);
|
||||||
.or_default()
|
}
|
||||||
.push_back(payload.clone());
|
|
||||||
}
|
}
|
||||||
let _ = append_broker_event(log_path, "publish", &routing_key, &payload);
|
let _ = append_broker_event(log_path, "publish", &routing_key, &payload);
|
||||||
|
if confirms_enabled {
|
||||||
|
next_publish_tag = next_publish_tag.saturating_add(1);
|
||||||
|
if amqp_write_basic_ack(&mut writer, frame.channel, next_publish_tag, false)
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// basic.get
|
// basic.get
|
||||||
(60, 70) => {
|
(60, 70) => {
|
||||||
|
|
@ -980,28 +1041,40 @@ fn handle_rabbit_amqp_connection(
|
||||||
let Some((delivery_tag, multiple)) = amqp_basic_ack_tag(&frame.payload) else {
|
let Some((delivery_tag, multiple)) = amqp_basic_ack_tag(&frame.payload) else {
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
let mut acked = Vec::new();
|
for (queue, tag) in rabbit_amqp_ack_deliveries(&state, delivery_tag, multiple) {
|
||||||
if let Ok(mut guard) = state.lock() {
|
|
||||||
if multiple {
|
|
||||||
let tags: Vec<u64> = guard
|
|
||||||
.inflight
|
|
||||||
.keys()
|
|
||||||
.copied()
|
|
||||||
.filter(|tag| *tag <= delivery_tag)
|
|
||||||
.collect();
|
|
||||||
for tag in tags {
|
|
||||||
if let Some((queue, _payload)) = guard.inflight.remove(&tag) {
|
|
||||||
acked.push((queue, tag));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if let Some((queue, _payload)) = guard.inflight.remove(&delivery_tag) {
|
|
||||||
acked.push((queue, delivery_tag));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (queue, tag) in acked {
|
|
||||||
let _ = append_broker_event(log_path, "ack", &queue, &tag.to_string());
|
let _ = append_broker_event(log_path, "ack", &queue, &tag.to_string());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// basic.reject
|
||||||
|
(60, 90) => {
|
||||||
|
let Some((delivery_tag, requeue)) = amqp_basic_reject_args(&frame.payload) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
for (queue, tag) in
|
||||||
|
rabbit_amqp_nack_deliveries(&state, delivery_tag, false, requeue)
|
||||||
|
{
|
||||||
|
let _ = append_broker_event(log_path, "nack", &queue, &tag.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// basic.nack
|
||||||
|
(60, 120) => {
|
||||||
|
let Some((delivery_tag, multiple, requeue)) = amqp_basic_nack_args(&frame.payload)
|
||||||
|
else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
for (queue, tag) in
|
||||||
|
rabbit_amqp_nack_deliveries(&state, delivery_tag, multiple, requeue)
|
||||||
|
{
|
||||||
|
let _ = append_broker_event(log_path, "nack", &queue, &tag.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// confirm.select
|
||||||
|
(85, 10) => {
|
||||||
|
confirms_enabled = true;
|
||||||
|
if amqp_write_method(&mut writer, frame.channel, 85, 11, &[]).is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1068,6 +1141,28 @@ fn amqp_write_queue_declare_ok(
|
||||||
amqp_write_method(writer, channel, 50, 11, &args)
|
amqp_write_method(writer, channel, 50, 11, &args)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn amqp_write_queue_delete_ok(
|
||||||
|
writer: &mut TcpStream,
|
||||||
|
channel: u16,
|
||||||
|
message_count: u32,
|
||||||
|
) -> std::io::Result<()> {
|
||||||
|
let mut args = Vec::new();
|
||||||
|
amqp_push_u32(&mut args, message_count);
|
||||||
|
amqp_write_method(writer, channel, 50, 41, &args)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn amqp_write_basic_ack(
|
||||||
|
writer: &mut TcpStream,
|
||||||
|
channel: u16,
|
||||||
|
delivery_tag: u64,
|
||||||
|
multiple: bool,
|
||||||
|
) -> std::io::Result<()> {
|
||||||
|
let mut args = Vec::new();
|
||||||
|
amqp_push_u64(&mut args, delivery_tag);
|
||||||
|
args.push(u8::from(multiple));
|
||||||
|
amqp_write_method(writer, channel, 60, 80, &args)
|
||||||
|
}
|
||||||
|
|
||||||
fn amqp_write_basic_get_ok(
|
fn amqp_write_basic_get_ok(
|
||||||
writer: &mut TcpStream,
|
writer: &mut TcpStream,
|
||||||
channel: u16,
|
channel: u16,
|
||||||
|
|
@ -1216,13 +1311,35 @@ fn amqp_queue_declare_name(payload: &[u8]) -> Option<String> {
|
||||||
amqp_take_shortstr(payload, &mut idx)
|
amqp_take_shortstr(payload, &mut idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn amqp_basic_publish_routing_key(payload: &[u8]) -> Option<String> {
|
fn amqp_exchange_declare_name(payload: &[u8]) -> Option<String> {
|
||||||
let mut idx = 4;
|
let mut idx = 4;
|
||||||
amqp_take_u16(payload, &mut idx)?;
|
amqp_take_u16(payload, &mut idx)?;
|
||||||
let _exchange = amqp_take_shortstr(payload, &mut idx)?;
|
|
||||||
amqp_take_shortstr(payload, &mut idx)
|
amqp_take_shortstr(payload, &mut idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn amqp_queue_bind_args(payload: &[u8]) -> Option<(String, String, String)> {
|
||||||
|
let mut idx = 4;
|
||||||
|
amqp_take_u16(payload, &mut idx)?;
|
||||||
|
let queue = amqp_take_shortstr(payload, &mut idx)?;
|
||||||
|
let exchange = amqp_take_shortstr(payload, &mut idx)?;
|
||||||
|
let routing_key = amqp_take_shortstr(payload, &mut idx)?;
|
||||||
|
Some((queue, exchange, routing_key))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn amqp_queue_delete_name(payload: &[u8]) -> Option<String> {
|
||||||
|
let mut idx = 4;
|
||||||
|
amqp_take_u16(payload, &mut idx)?;
|
||||||
|
amqp_take_shortstr(payload, &mut idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn amqp_basic_publish_args(payload: &[u8]) -> Option<(String, String)> {
|
||||||
|
let mut idx = 4;
|
||||||
|
amqp_take_u16(payload, &mut idx)?;
|
||||||
|
let exchange = amqp_take_shortstr(payload, &mut idx)?;
|
||||||
|
let routing_key = amqp_take_shortstr(payload, &mut idx)?;
|
||||||
|
Some((exchange, routing_key))
|
||||||
|
}
|
||||||
|
|
||||||
fn amqp_basic_get_queue(payload: &[u8]) -> Option<String> {
|
fn amqp_basic_get_queue(payload: &[u8]) -> Option<String> {
|
||||||
let mut idx = 4;
|
let mut idx = 4;
|
||||||
amqp_take_u16(payload, &mut idx)?;
|
amqp_take_u16(payload, &mut idx)?;
|
||||||
|
|
@ -1250,6 +1367,20 @@ fn amqp_basic_ack_tag(payload: &[u8]) -> Option<(u64, bool)> {
|
||||||
Some((tag, bits & 1 != 0))
|
Some((tag, bits & 1 != 0))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn amqp_basic_reject_args(payload: &[u8]) -> Option<(u64, bool)> {
|
||||||
|
let mut idx = 4;
|
||||||
|
let tag = amqp_take_u64(payload, &mut idx)?;
|
||||||
|
let bits = payload.get(idx).copied().unwrap_or(0);
|
||||||
|
Some((tag, bits & 1 != 0))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn amqp_basic_nack_args(payload: &[u8]) -> Option<(u64, bool, bool)> {
|
||||||
|
let mut idx = 4;
|
||||||
|
let tag = amqp_take_u64(payload, &mut idx)?;
|
||||||
|
let bits = payload.get(idx).copied().unwrap_or(0);
|
||||||
|
Some((tag, bits & 1 != 0, bits & 0b10 != 0))
|
||||||
|
}
|
||||||
|
|
||||||
fn amqp_take_u16(payload: &[u8], idx: &mut usize) -> Option<u16> {
|
fn amqp_take_u16(payload: &[u8], idx: &mut usize) -> Option<u16> {
|
||||||
let end = *idx + 2;
|
let end = *idx + 2;
|
||||||
let bytes: [u8; 2] = payload.get(*idx..end)?.try_into().ok()?;
|
let bytes: [u8; 2] = payload.get(*idx..end)?.try_into().ok()?;
|
||||||
|
|
@ -1353,6 +1484,102 @@ fn rabbit_amqp_deliver_to_consumer(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn rabbit_amqp_publish_destinations(
|
||||||
|
state: &Arc<Mutex<RabbitAmqpState>>,
|
||||||
|
exchange: &str,
|
||||||
|
routing_key: &str,
|
||||||
|
) -> Vec<String> {
|
||||||
|
if exchange.is_empty() {
|
||||||
|
return vec![routing_key.to_owned()];
|
||||||
|
}
|
||||||
|
let mut out = state
|
||||||
|
.lock()
|
||||||
|
.ok()
|
||||||
|
.and_then(|guard| {
|
||||||
|
guard
|
||||||
|
.bindings
|
||||||
|
.get(&(exchange.to_owned(), routing_key.to_owned()))
|
||||||
|
.cloned()
|
||||||
|
})
|
||||||
|
.unwrap_or_default();
|
||||||
|
if out.is_empty() {
|
||||||
|
out.push(routing_key.to_owned());
|
||||||
|
}
|
||||||
|
out.sort();
|
||||||
|
out.dedup();
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
fn rabbit_amqp_enqueue(state: &Arc<Mutex<RabbitAmqpState>>, queue: &str, payload: &str) {
|
||||||
|
if let Ok(mut guard) = state.lock() {
|
||||||
|
guard
|
||||||
|
.queues
|
||||||
|
.entry(queue.to_owned())
|
||||||
|
.or_default()
|
||||||
|
.push_back(payload.to_owned());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn rabbit_amqp_ack_deliveries(
|
||||||
|
state: &Arc<Mutex<RabbitAmqpState>>,
|
||||||
|
delivery_tag: u64,
|
||||||
|
multiple: bool,
|
||||||
|
) -> Vec<(String, u64)> {
|
||||||
|
let mut acked = Vec::new();
|
||||||
|
if let Ok(mut guard) = state.lock() {
|
||||||
|
if multiple {
|
||||||
|
let tags: Vec<u64> = guard
|
||||||
|
.inflight
|
||||||
|
.keys()
|
||||||
|
.copied()
|
||||||
|
.filter(|tag| *tag <= delivery_tag)
|
||||||
|
.collect();
|
||||||
|
for tag in tags {
|
||||||
|
if let Some((queue, _payload)) = guard.inflight.remove(&tag) {
|
||||||
|
acked.push((queue, tag));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if let Some((queue, _payload)) = guard.inflight.remove(&delivery_tag) {
|
||||||
|
acked.push((queue, delivery_tag));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
acked
|
||||||
|
}
|
||||||
|
|
||||||
|
fn rabbit_amqp_nack_deliveries(
|
||||||
|
state: &Arc<Mutex<RabbitAmqpState>>,
|
||||||
|
delivery_tag: u64,
|
||||||
|
multiple: bool,
|
||||||
|
requeue: bool,
|
||||||
|
) -> Vec<(String, u64)> {
|
||||||
|
let mut nacked = Vec::new();
|
||||||
|
if let Ok(mut guard) = state.lock() {
|
||||||
|
let tags: Vec<u64> = if multiple {
|
||||||
|
guard
|
||||||
|
.inflight
|
||||||
|
.keys()
|
||||||
|
.copied()
|
||||||
|
.filter(|tag| *tag <= delivery_tag)
|
||||||
|
.collect()
|
||||||
|
} else {
|
||||||
|
vec![delivery_tag]
|
||||||
|
};
|
||||||
|
for tag in tags {
|
||||||
|
if let Some((queue, payload)) = guard.inflight.remove(&tag) {
|
||||||
|
if requeue {
|
||||||
|
guard
|
||||||
|
.queues
|
||||||
|
.entry(queue.clone())
|
||||||
|
.or_default()
|
||||||
|
.push_front(payload);
|
||||||
|
}
|
||||||
|
nacked.push((queue, tag));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nacked
|
||||||
|
}
|
||||||
|
|
||||||
fn rabbit_amqp_remove_consumers(state: &Arc<Mutex<RabbitAmqpState>>, consumer_tags: &[String]) {
|
fn rabbit_amqp_remove_consumers(state: &Arc<Mutex<RabbitAmqpState>>, consumer_tags: &[String]) {
|
||||||
if consumer_tags.is_empty() {
|
if consumer_tags.is_empty() {
|
||||||
return;
|
return;
|
||||||
|
|
@ -2234,6 +2461,202 @@ mod tests {
|
||||||
assert_eq!(events[1].detail.get("payload").unwrap(), "async payload");
|
assert_eq!(events[1].detail.get("payload").unwrap(), "async payload");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn rabbit_amqp_exchange_bind_and_publisher_confirm_route_to_queue() {
|
||||||
|
let dir = TempDir::new().unwrap();
|
||||||
|
let stub = BrokerStub::start(StubKind::Rabbit, dir.path()).unwrap();
|
||||||
|
let endpoint = stub.endpoint();
|
||||||
|
if endpoint == "loopback://rabbit" {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let port: u16 = endpoint
|
||||||
|
.trim_start_matches("amqp://127.0.0.1:")
|
||||||
|
.split('/')
|
||||||
|
.next()
|
||||||
|
.unwrap()
|
||||||
|
.parse()
|
||||||
|
.unwrap();
|
||||||
|
let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
|
||||||
|
let mut reader = BufReader::new(s.try_clone().unwrap());
|
||||||
|
amqp_test_open_channel(&mut s, &mut reader);
|
||||||
|
|
||||||
|
let mut exchange = Vec::new();
|
||||||
|
amqp_push_u16(&mut exchange, 0);
|
||||||
|
amqp_push_shortstr(&mut exchange, "events");
|
||||||
|
amqp_push_shortstr(&mut exchange, "direct");
|
||||||
|
exchange.push(0);
|
||||||
|
amqp_push_table_empty(&mut exchange);
|
||||||
|
amqp_write_method(&mut s, 1, 40, 10, &exchange).unwrap();
|
||||||
|
assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 40, 11);
|
||||||
|
|
||||||
|
let mut declare = Vec::new();
|
||||||
|
amqp_push_u16(&mut declare, 0);
|
||||||
|
amqp_push_shortstr(&mut declare, "work");
|
||||||
|
declare.push(0);
|
||||||
|
amqp_push_table_empty(&mut declare);
|
||||||
|
amqp_write_method(&mut s, 1, 50, 10, &declare).unwrap();
|
||||||
|
assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 50, 11);
|
||||||
|
|
||||||
|
let mut bind = Vec::new();
|
||||||
|
amqp_push_u16(&mut bind, 0);
|
||||||
|
amqp_push_shortstr(&mut bind, "work");
|
||||||
|
amqp_push_shortstr(&mut bind, "events");
|
||||||
|
amqp_push_shortstr(&mut bind, "orders.created");
|
||||||
|
bind.push(0);
|
||||||
|
amqp_push_table_empty(&mut bind);
|
||||||
|
amqp_write_method(&mut s, 1, 50, 20, &bind).unwrap();
|
||||||
|
assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 50, 21);
|
||||||
|
|
||||||
|
amqp_write_method(&mut s, 1, 85, 10, &[0]).unwrap();
|
||||||
|
assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 85, 11);
|
||||||
|
|
||||||
|
let mut publish = Vec::new();
|
||||||
|
amqp_push_u16(&mut publish, 0);
|
||||||
|
amqp_push_shortstr(&mut publish, "events");
|
||||||
|
amqp_push_shortstr(&mut publish, "orders.created");
|
||||||
|
publish.push(0);
|
||||||
|
amqp_write_method(&mut s, 1, 60, 40, &publish).unwrap();
|
||||||
|
amqp_write_content(&mut s, 1, b"exchange payload").unwrap();
|
||||||
|
assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 60, 80);
|
||||||
|
|
||||||
|
let mut get = Vec::new();
|
||||||
|
amqp_push_u16(&mut get, 0);
|
||||||
|
amqp_push_shortstr(&mut get, "work");
|
||||||
|
get.push(0);
|
||||||
|
amqp_write_method(&mut s, 1, 60, 70, &get).unwrap();
|
||||||
|
let get_ok = amqp_read_frame(&mut reader).unwrap();
|
||||||
|
assert_amqp_method_ref(&get_ok, 1, 60, 71);
|
||||||
|
let mut idx = 4;
|
||||||
|
let delivery_tag = amqp_take_u64(&get_ok.payload, &mut idx).unwrap();
|
||||||
|
let header = amqp_read_frame(&mut reader).unwrap();
|
||||||
|
assert_eq!(header.frame_type, AMQP_FRAME_HEADER);
|
||||||
|
let body = amqp_read_frame(&mut reader).unwrap();
|
||||||
|
assert_eq!(body.frame_type, AMQP_FRAME_BODY);
|
||||||
|
assert_eq!(body.payload, b"exchange payload");
|
||||||
|
|
||||||
|
let mut ack = Vec::new();
|
||||||
|
amqp_push_u64(&mut ack, delivery_tag);
|
||||||
|
ack.push(0);
|
||||||
|
amqp_write_method(&mut s, 1, 60, 80, &ack).unwrap();
|
||||||
|
std::thread::sleep(Duration::from_millis(25));
|
||||||
|
|
||||||
|
let events = stub.drain_events();
|
||||||
|
let actions: Vec<&str> = events
|
||||||
|
.iter()
|
||||||
|
.map(|ev| ev.detail.get("action").unwrap().as_str())
|
||||||
|
.collect();
|
||||||
|
assert_eq!(actions, vec!["publish", "deliver", "ack"]);
|
||||||
|
assert_eq!(
|
||||||
|
events[0].detail.get("destination").unwrap(),
|
||||||
|
"orders.created"
|
||||||
|
);
|
||||||
|
assert_eq!(events[1].detail.get("destination").unwrap(), "work");
|
||||||
|
assert_eq!(events[1].detail.get("payload").unwrap(), "exchange payload");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn rabbit_amqp_basic_nack_requeues_delivery() {
|
||||||
|
let dir = TempDir::new().unwrap();
|
||||||
|
let stub = BrokerStub::start(StubKind::Rabbit, dir.path()).unwrap();
|
||||||
|
let endpoint = stub.endpoint();
|
||||||
|
if endpoint == "loopback://rabbit" {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let port: u16 = endpoint
|
||||||
|
.trim_start_matches("amqp://127.0.0.1:")
|
||||||
|
.split('/')
|
||||||
|
.next()
|
||||||
|
.unwrap()
|
||||||
|
.parse()
|
||||||
|
.unwrap();
|
||||||
|
let mut s = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
|
||||||
|
let mut reader = BufReader::new(s.try_clone().unwrap());
|
||||||
|
amqp_test_open_channel(&mut s, &mut reader);
|
||||||
|
|
||||||
|
let mut declare = Vec::new();
|
||||||
|
amqp_push_u16(&mut declare, 0);
|
||||||
|
amqp_push_shortstr(&mut declare, "work");
|
||||||
|
declare.push(0);
|
||||||
|
amqp_push_table_empty(&mut declare);
|
||||||
|
amqp_write_method(&mut s, 1, 50, 10, &declare).unwrap();
|
||||||
|
assert_amqp_method(amqp_read_frame(&mut reader).unwrap(), 1, 50, 11);
|
||||||
|
|
||||||
|
let mut publish = Vec::new();
|
||||||
|
amqp_push_u16(&mut publish, 0);
|
||||||
|
amqp_push_shortstr(&mut publish, "");
|
||||||
|
amqp_push_shortstr(&mut publish, "work");
|
||||||
|
publish.push(0);
|
||||||
|
amqp_write_method(&mut s, 1, 60, 40, &publish).unwrap();
|
||||||
|
amqp_write_content(&mut s, 1, b"retry payload").unwrap();
|
||||||
|
|
||||||
|
let mut get = Vec::new();
|
||||||
|
amqp_push_u16(&mut get, 0);
|
||||||
|
amqp_push_shortstr(&mut get, "work");
|
||||||
|
get.push(0);
|
||||||
|
amqp_write_method(&mut s, 1, 60, 70, &get).unwrap();
|
||||||
|
let first_get_ok = amqp_read_frame(&mut reader).unwrap();
|
||||||
|
assert_amqp_method_ref(&first_get_ok, 1, 60, 71);
|
||||||
|
let mut idx = 4;
|
||||||
|
let first_delivery_tag = amqp_take_u64(&first_get_ok.payload, &mut idx).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
amqp_read_frame(&mut reader).unwrap().frame_type,
|
||||||
|
AMQP_FRAME_HEADER
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
amqp_read_frame(&mut reader).unwrap().payload,
|
||||||
|
b"retry payload"
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut nack = Vec::new();
|
||||||
|
amqp_push_u64(&mut nack, first_delivery_tag);
|
||||||
|
nack.push(0b10);
|
||||||
|
amqp_write_method(&mut s, 1, 60, 120, &nack).unwrap();
|
||||||
|
|
||||||
|
let mut get_again = Vec::new();
|
||||||
|
amqp_push_u16(&mut get_again, 0);
|
||||||
|
amqp_push_shortstr(&mut get_again, "work");
|
||||||
|
get_again.push(0);
|
||||||
|
amqp_write_method(&mut s, 1, 60, 70, &get_again).unwrap();
|
||||||
|
let second_get_ok = amqp_read_frame(&mut reader).unwrap();
|
||||||
|
assert_amqp_method_ref(&second_get_ok, 1, 60, 71);
|
||||||
|
let mut idx = 4;
|
||||||
|
let second_delivery_tag = amqp_take_u64(&second_get_ok.payload, &mut idx).unwrap();
|
||||||
|
assert_ne!(first_delivery_tag, second_delivery_tag);
|
||||||
|
assert_eq!(
|
||||||
|
amqp_read_frame(&mut reader).unwrap().frame_type,
|
||||||
|
AMQP_FRAME_HEADER
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
amqp_read_frame(&mut reader).unwrap().payload,
|
||||||
|
b"retry payload"
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut ack = Vec::new();
|
||||||
|
amqp_push_u64(&mut ack, second_delivery_tag);
|
||||||
|
ack.push(0);
|
||||||
|
amqp_write_method(&mut s, 1, 60, 80, &ack).unwrap();
|
||||||
|
std::thread::sleep(Duration::from_millis(25));
|
||||||
|
|
||||||
|
let events = stub.drain_events();
|
||||||
|
let actions: Vec<&str> = events
|
||||||
|
.iter()
|
||||||
|
.map(|ev| ev.detail.get("action").unwrap().as_str())
|
||||||
|
.collect();
|
||||||
|
assert_eq!(
|
||||||
|
actions,
|
||||||
|
vec!["publish", "deliver", "nack", "deliver", "ack"]
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
events[2].detail.get("payload").unwrap(),
|
||||||
|
&first_delivery_tag.to_string()
|
||||||
|
);
|
||||||
|
assert_eq!(events[3].detail.get("payload").unwrap(), "retry payload");
|
||||||
|
assert_eq!(
|
||||||
|
events[4].detail.get("payload").unwrap(),
|
||||||
|
&second_delivery_tag.to_string()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn nats_protocol_server_records_publish_deliver() {
|
fn nats_protocol_server_records_publish_deliver() {
|
||||||
let dir = TempDir::new().unwrap();
|
let dir = TempDir::new().unwrap();
|
||||||
|
|
|
||||||
|
|
@ -101,6 +101,13 @@ fn make_spec_with_adapter(
|
||||||
spec
|
spec
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn assert_extra_file_contains(files: &[(String, String)], path: &str, needle: &str, context: &str) {
|
||||||
|
assert!(
|
||||||
|
files.iter().any(|(p, c)| p == path && c.contains(needle)),
|
||||||
|
"{context} must stage {path} containing {needle:?}; got {files:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// ── Supported-set assertions ──────────────────────────────────────────────────
|
// ── Supported-set assertions ──────────────────────────────────────────────────
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
@ -354,6 +361,129 @@ fn message_handler_java_rabbit_tries_real_client_before_fallbacks() {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn message_handler_real_client_runtime_deps_are_staged_from_adapter() {
|
||||||
|
let py_kafka = lang::emit(&make_spec_with_adapter(
|
||||||
|
Lang::Python,
|
||||||
|
"orders",
|
||||||
|
"handler",
|
||||||
|
entry_file("kafka_python"),
|
||||||
|
"kafka-python",
|
||||||
|
))
|
||||||
|
.expect("emit kafka-python");
|
||||||
|
assert_extra_file_contains(
|
||||||
|
&py_kafka.extra_files,
|
||||||
|
"requirements.txt",
|
||||||
|
"kafka-python",
|
||||||
|
"kafka-python",
|
||||||
|
);
|
||||||
|
|
||||||
|
let py_pubsub = lang::emit(&make_spec_with_adapter(
|
||||||
|
Lang::Python,
|
||||||
|
"projects/p/subscriptions/s",
|
||||||
|
"callback",
|
||||||
|
entry_file("pubsub_python"),
|
||||||
|
"pubsub-python",
|
||||||
|
))
|
||||||
|
.expect("emit pubsub-python");
|
||||||
|
assert_extra_file_contains(
|
||||||
|
&py_pubsub.extra_files,
|
||||||
|
"requirements.txt",
|
||||||
|
"google-cloud-pubsub",
|
||||||
|
"pubsub-python",
|
||||||
|
);
|
||||||
|
|
||||||
|
let py_rabbit = lang::emit(&make_spec_with_adapter(
|
||||||
|
Lang::Python,
|
||||||
|
"work",
|
||||||
|
"on_message",
|
||||||
|
entry_file("rabbit_python"),
|
||||||
|
"rabbit-python",
|
||||||
|
))
|
||||||
|
.expect("emit rabbit-python");
|
||||||
|
assert_extra_file_contains(
|
||||||
|
&py_rabbit.extra_files,
|
||||||
|
"requirements.txt",
|
||||||
|
"pika",
|
||||||
|
"rabbit-python",
|
||||||
|
);
|
||||||
|
|
||||||
|
let node_sqs = lang::emit(&make_spec_with_adapter(
|
||||||
|
Lang::JavaScript,
|
||||||
|
"jobs",
|
||||||
|
"handler",
|
||||||
|
entry_file("sqs_node"),
|
||||||
|
"sqs-node",
|
||||||
|
))
|
||||||
|
.expect("emit sqs-node");
|
||||||
|
assert_extra_file_contains(
|
||||||
|
&node_sqs.extra_files,
|
||||||
|
"package.json",
|
||||||
|
"@aws-sdk/client-sqs",
|
||||||
|
"sqs-node",
|
||||||
|
);
|
||||||
|
|
||||||
|
let java_kafka = lang::emit(&make_spec_with_adapter(
|
||||||
|
Lang::Java,
|
||||||
|
"orders",
|
||||||
|
"onMessage",
|
||||||
|
entry_file("kafka_java"),
|
||||||
|
"kafka-java",
|
||||||
|
))
|
||||||
|
.expect("emit kafka-java");
|
||||||
|
assert_extra_file_contains(
|
||||||
|
&java_kafka.extra_files,
|
||||||
|
"pom.xml",
|
||||||
|
"kafka-clients",
|
||||||
|
"kafka-java",
|
||||||
|
);
|
||||||
|
|
||||||
|
let java_rabbit = lang::emit(&make_spec_with_adapter(
|
||||||
|
Lang::Java,
|
||||||
|
"work",
|
||||||
|
"onMessage",
|
||||||
|
entry_file("rabbit_java"),
|
||||||
|
"rabbit-java",
|
||||||
|
))
|
||||||
|
.expect("emit rabbit-java");
|
||||||
|
assert_extra_file_contains(
|
||||||
|
&java_rabbit.extra_files,
|
||||||
|
"pom.xml",
|
||||||
|
"amqp-client",
|
||||||
|
"rabbit-java",
|
||||||
|
);
|
||||||
|
|
||||||
|
let go_pubsub = lang::emit(&make_spec_with_adapter(
|
||||||
|
Lang::Go,
|
||||||
|
"my-sub",
|
||||||
|
"OnMessage",
|
||||||
|
entry_file("pubsub_go"),
|
||||||
|
"pubsub-go",
|
||||||
|
))
|
||||||
|
.expect("emit pubsub-go");
|
||||||
|
assert_extra_file_contains(
|
||||||
|
&go_pubsub.extra_files,
|
||||||
|
"go.mod",
|
||||||
|
"cloud.google.com/go/pubsub",
|
||||||
|
"pubsub-go",
|
||||||
|
);
|
||||||
|
|
||||||
|
let go_nats = lang::emit(&make_spec_with_adapter(
|
||||||
|
Lang::Go,
|
||||||
|
"events",
|
||||||
|
"OnMessage",
|
||||||
|
entry_file("nats_go"),
|
||||||
|
"nats-go",
|
||||||
|
))
|
||||||
|
.expect("emit nats-go");
|
||||||
|
assert_extra_file_contains(
|
||||||
|
&go_nats.extra_files,
|
||||||
|
"go.mod",
|
||||||
|
"github.com/nats-io/nats.go",
|
||||||
|
"nats-go",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn message_handler_go_pubsub_tries_real_client_before_fallbacks() {
|
fn message_handler_go_pubsub_tries_real_client_before_fallbacks() {
|
||||||
let spec = make_spec_with_adapter(
|
let spec = make_spec_with_adapter(
|
||||||
|
|
|
||||||
|
|
@ -752,8 +752,33 @@ fn graphql_resolver_js_harness_carries_sentinel_and_field() {
|
||||||
let h = lang::emit(&spec).expect("emit ok");
|
let h = lang::emit(&spec).expect("emit ok");
|
||||||
assert!(h.source.contains("__NYX_GRAPHQL_RESOLVER__"));
|
assert!(h.source.contains("__NYX_GRAPHQL_RESOLVER__"));
|
||||||
assert!(h.source.contains("\"resolveUser\""));
|
assert!(h.source.contains("\"resolveUser\""));
|
||||||
|
assert!(h.source.contains("_nyxTryApolloServer"));
|
||||||
|
assert!(h.source.contains("require('@apollo/server')"));
|
||||||
assert!(h.source.contains("_nyxTryGraphqlJs"));
|
assert!(h.source.contains("_nyxTryGraphqlJs"));
|
||||||
assert!(h.source.contains("require('graphql')"));
|
assert!(h.source.contains("require('graphql')"));
|
||||||
|
assert!(
|
||||||
|
h.source.find("_nyxTryApolloServer").unwrap() < h.source.find("_nyxTryGraphqlJs").unwrap(),
|
||||||
|
"Apollo Server should run before the GraphQL.js fallback"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn graphql_resolver_js_apollo_stages_runtime_deps() {
|
||||||
|
let spec = framework_bound_spec(
|
||||||
|
Lang::JavaScript,
|
||||||
|
EvEntryKind::GraphQLResolver {
|
||||||
|
type_name: "Query".into(),
|
||||||
|
field: "user".into(),
|
||||||
|
},
|
||||||
|
"resolveUser",
|
||||||
|
"tests/dynamic_fixtures/graphql_resolver/apollo/vuln.js",
|
||||||
|
"graphql-apollo",
|
||||||
|
);
|
||||||
|
let h = lang::emit(&spec).expect("emit ok");
|
||||||
|
let package = extra_file_content(&h.extra_files, "package.json");
|
||||||
|
assert!(package.contains("\"@apollo/server\""));
|
||||||
|
assert!(package.contains("\"apollo-server\""));
|
||||||
|
assert!(package.contains("\"graphql\""));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue