feat(dynamic): replace fixed sleeps with drain_events_until for reliable event handling in tests under load

This commit is contained in:
elipeter 2026-06-02 14:11:22 -05:00
parent 879f965379
commit 1f5777ff11

View file

@ -3839,9 +3839,8 @@ mod tests {
amqp_push_u64(&mut ack, delivery_tag);
ack.push(0);
amqp_write_method(&mut s, 1, 60, 80, &ack).unwrap();
std::thread::sleep(Duration::from_millis(25));
let events = stub.drain_events();
let events = drain_events_until(&stub, 3, Duration::from_secs(5));
let actions: Vec<&str> = events
.iter()
.map(|ev| ev.detail.get("action").unwrap().as_str())
@ -3917,9 +3916,8 @@ mod tests {
amqp_push_u64(&mut ack, delivery_tag);
ack.push(0);
amqp_write_method(&mut s, 1, 60, 80, &ack).unwrap();
std::thread::sleep(Duration::from_millis(25));
let events = stub.drain_events();
let events = drain_events_until(&stub, 3, Duration::from_secs(5));
let actions: Vec<&str> = events
.iter()
.map(|ev| ev.detail.get("action").unwrap().as_str())
@ -4005,9 +4003,8 @@ mod tests {
amqp_push_u64(&mut ack, delivery_tag);
ack.push(0);
amqp_write_method(&mut s, 1, 60, 80, &ack).unwrap();
std::thread::sleep(Duration::from_millis(25));
let events = stub.drain_events();
let events = drain_events_until(&stub, 3, Duration::from_secs(5));
let actions: Vec<&str> = events
.iter()
.map(|ev| ev.detail.get("action").unwrap().as_str())
@ -4102,9 +4099,8 @@ mod tests {
amqp_push_u64(&mut ack, second_delivery_tag);
ack.push(0);
amqp_write_method(&mut s, 1, 60, 80, &ack).unwrap();
std::thread::sleep(Duration::from_millis(25));
let events = stub.drain_events();
let events = drain_events_until(&stub, 5, Duration::from_secs(5));
let actions: Vec<&str> = events
.iter()
.map(|ev| ev.detail.get("action").unwrap().as_str())
@ -4375,6 +4371,26 @@ mod tests {
out
}
/// Poll `drain_events` until at least `want` events have accumulated
/// or `timeout` elapses, then return everything drained so far.
///
/// The broker server records publish/deliver/ack observations on its
/// own thread by appending to the log file, so an `ack` frame written
/// to the socket is not guaranteed to be flushed to the log the
/// instant the test returns from the socket write. A fixed sleep races
/// that thread and is flaky under parallel test load. The cursor in
/// `drain_events` advances on every call, so accumulating across drains
/// is safe and order-preserving.
fn drain_events_until(stub: &BrokerStub, want: usize, timeout: Duration) -> Vec<StubEvent> {
let deadline = std::time::Instant::now() + timeout;
let mut events = stub.drain_events();
while events.len() < want && std::time::Instant::now() < deadline {
std::thread::sleep(Duration::from_millis(2));
events.extend(stub.drain_events());
}
events
}
fn read_until(reader: &mut BufReader<TcpStream>, needle: &str) -> String {
let mut out = String::new();
while !out.contains(needle) {