Fix subscriber unexpected message causing queue clogging (#642)

queue clogging.
This commit is contained in:
cybermaggedon 2026-02-23 14:34:05 +00:00 committed by GitHub
parent 0116eb3dea
commit 5ffad92345
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 89 additions and 27 deletions

View file

@ -88,8 +88,13 @@ async def test_subscriber_deferred_acknowledgment_success():
@pytest.mark.asyncio
async def test_subscriber_deferred_acknowledgment_failure():
"""Verify Subscriber negative acks on delivery failure."""
async def test_subscriber_dropped_message_still_acks():
"""Verify Subscriber acks even when message is dropped (backpressure).
This prevents redelivery storms on shared topics - if we negative_ack
a dropped message, it gets redelivered to all subscribers, none of
whom can handle it either, causing a tight redelivery loop.
"""
mock_backend = MagicMock()
mock_consumer = MagicMock()
mock_backend.create_consumer.return_value = mock_consumer
@ -103,24 +108,66 @@ async def test_subscriber_deferred_acknowledgment_failure():
max_size=1, # Very small queue
backpressure_strategy="drop_new"
)
# Start subscriber to initialize consumer
await subscriber.start()
# Create queue and fill it
queue = await subscriber.subscribe("test-queue")
await queue.put({"existing": "data"})
# Create mock message - should be dropped
msg = create_mock_message("msg-1", {"data": "test"})
# Process message (should fail due to full queue + drop_new strategy)
# Create mock message - should be dropped due to full queue
msg = create_mock_message("test-queue", {"data": "test"})
# Process message (should be dropped due to full queue + drop_new strategy)
await subscriber._process_message(msg)
# Should negative acknowledge failed delivery
mock_consumer.negative_acknowledge.assert_called_once_with(msg)
mock_consumer.acknowledge.assert_not_called()
# Should acknowledge even though delivery failed - prevents redelivery storm
mock_consumer.acknowledge.assert_called_once_with(msg)
mock_consumer.negative_acknowledge.assert_not_called()
# Clean up
await subscriber.stop()
@pytest.mark.asyncio
async def test_subscriber_orphaned_message_acks():
"""Verify Subscriber acks orphaned messages (no matching waiter).
On shared response topics, if a message arrives for a waiter that
no longer exists (e.g., client disconnected, request timed out),
we must acknowledge it to prevent redelivery storms.
"""
mock_backend = MagicMock()
mock_consumer = MagicMock()
mock_backend.create_consumer.return_value = mock_consumer
subscriber = Subscriber(
backend=mock_backend,
topic="test-topic",
subscription="test-subscription",
consumer_name="test-consumer",
schema=dict,
max_size=10,
backpressure_strategy="block"
)
# Start subscriber to initialize consumer
await subscriber.start()
# Don't create any queues - message will be orphaned
# This simulates a response arriving after the waiter has unsubscribed
# Create mock message with an ID that has no matching waiter
msg = create_mock_message("non-existent-waiter-id", {"data": "orphaned"})
# Process message (should be orphaned - no matching waiter)
await subscriber._process_message(msg)
# Should acknowledge orphaned message - prevents redelivery storm
mock_consumer.acknowledge.assert_called_once_with(msg)
mock_consumer.negative_acknowledge.assert_not_called()
# Clean up
await subscriber.stop()

View file

@ -222,35 +222,50 @@ class Subscriber:
# Store message for later acknowledgment
msg_id = str(uuid.uuid4())
self.pending_acks[msg_id] = msg
try:
id = msg.properties()["id"]
except:
id = None
value = msg.value()
delivery_success = False
has_matching_waiter = False
async with self.lock:
# Deliver to specific subscribers
if id in self.q:
has_matching_waiter = True
delivery_success = await self._deliver_to_queue(
self.q[id], value
)
# Deliver to all subscribers
for q in self.full.values():
has_matching_waiter = True
if await self._deliver_to_queue(q, value):
delivery_success = True
# Acknowledge only on successful delivery
if delivery_success:
self.consumer.acknowledge(msg)
del self.pending_acks[msg_id]
else:
# Negative acknowledge for retry
self.consumer.negative_acknowledge(msg)
del self.pending_acks[msg_id]
# Always acknowledge the message to prevent redelivery storms
# on shared topics. Negative acknowledging orphaned messages
# (no matching waiter) causes immediate redelivery to all
# subscribers, none of whom can handle it either.
self.consumer.acknowledge(msg)
del self.pending_acks[msg_id]
if not delivery_success:
if not has_matching_waiter:
# Message arrived for a waiter that no longer exists
# (likely due to client disconnect or timeout)
logger.debug(
f"Discarding orphaned message with id={id} - "
"no matching waiter"
)
else:
# Delivery failed (e.g., queue full with drop_new strategy)
logger.debug(
f"Message with id={id} dropped due to backpressure"
)
async def _deliver_to_queue(self, queue, value):
"""Deliver message to queue with backpressure handling"""