diff --git a/src/dynamic/stubs/broker.rs b/src/dynamic/stubs/broker.rs index eaf3ec0b..2cc2103e 100644 --- a/src/dynamic/stubs/broker.rs +++ b/src/dynamic/stubs/broker.rs @@ -3839,9 +3839,8 @@ mod tests { amqp_push_u64(&mut ack, delivery_tag); ack.push(0); amqp_write_method(&mut s, 1, 60, 80, &ack).unwrap(); - std::thread::sleep(Duration::from_millis(25)); - let events = stub.drain_events(); + let events = drain_events_until(&stub, 3, Duration::from_secs(5)); let actions: Vec<&str> = events .iter() .map(|ev| ev.detail.get("action").unwrap().as_str()) @@ -3917,9 +3916,8 @@ mod tests { amqp_push_u64(&mut ack, delivery_tag); ack.push(0); amqp_write_method(&mut s, 1, 60, 80, &ack).unwrap(); - std::thread::sleep(Duration::from_millis(25)); - let events = stub.drain_events(); + let events = drain_events_until(&stub, 3, Duration::from_secs(5)); let actions: Vec<&str> = events .iter() .map(|ev| ev.detail.get("action").unwrap().as_str()) @@ -4005,9 +4003,8 @@ mod tests { amqp_push_u64(&mut ack, delivery_tag); ack.push(0); amqp_write_method(&mut s, 1, 60, 80, &ack).unwrap(); - std::thread::sleep(Duration::from_millis(25)); - let events = stub.drain_events(); + let events = drain_events_until(&stub, 3, Duration::from_secs(5)); let actions: Vec<&str> = events .iter() .map(|ev| ev.detail.get("action").unwrap().as_str()) @@ -4102,9 +4099,8 @@ mod tests { amqp_push_u64(&mut ack, second_delivery_tag); ack.push(0); amqp_write_method(&mut s, 1, 60, 80, &ack).unwrap(); - std::thread::sleep(Duration::from_millis(25)); - let events = stub.drain_events(); + let events = drain_events_until(&stub, 5, Duration::from_secs(5)); let actions: Vec<&str> = events .iter() .map(|ev| ev.detail.get("action").unwrap().as_str()) @@ -4375,6 +4371,26 @@ mod tests { out } + /// Poll `drain_events` until at least `want` events have accumulated + /// or `timeout` elapses, then return everything drained so far. + /// + /// The broker server records publish/deliver/ack observations on its + /// own thread by appending to the log file, so an `ack` frame written + /// to the socket is not guaranteed to be flushed to the log the + /// instant the test returns from the socket write. A fixed sleep races + /// that thread and is flaky under parallel test load. The cursor in + /// `drain_events` advances on every call, so accumulating across drains + /// is safe and order-preserving. + fn drain_events_until(stub: &BrokerStub, want: usize, timeout: Duration) -> Vec { + let deadline = std::time::Instant::now() + timeout; + let mut events = stub.drain_events(); + while events.len() < want && std::time::Instant::now() < deadline { + std::thread::sleep(Duration::from_millis(2)); + events.extend(stub.drain_events()); + } + events + } + fn read_until(reader: &mut BufReader, needle: &str) -> String { let mut out = String::new(); while !out.contains(needle) {