diff --git a/tests/unit/test_base/test_subscriber_graceful_shutdown.py b/tests/unit/test_base/test_subscriber_graceful_shutdown.py index ea5d04cc..0587e3d6 100644 --- a/tests/unit/test_base/test_subscriber_graceful_shutdown.py +++ b/tests/unit/test_base/test_subscriber_graceful_shutdown.py @@ -88,8 +88,13 @@ async def test_subscriber_deferred_acknowledgment_success(): @pytest.mark.asyncio -async def test_subscriber_deferred_acknowledgment_failure(): - """Verify Subscriber negative acks on delivery failure.""" +async def test_subscriber_dropped_message_still_acks(): + """Verify Subscriber acks even when message is dropped (backpressure). + + This prevents redelivery storms on shared topics - if we negative_ack + a dropped message, it gets redelivered to all subscribers, none of + whom can handle it either, causing a tight redelivery loop. + """ mock_backend = MagicMock() mock_consumer = MagicMock() mock_backend.create_consumer.return_value = mock_consumer @@ -103,24 +108,66 @@ async def test_subscriber_deferred_acknowledgment_failure(): max_size=1, # Very small queue backpressure_strategy="drop_new" ) - + # Start subscriber to initialize consumer await subscriber.start() - + # Create queue and fill it queue = await subscriber.subscribe("test-queue") await queue.put({"existing": "data"}) - - # Create mock message - should be dropped - msg = create_mock_message("msg-1", {"data": "test"}) - - # Process message (should fail due to full queue + drop_new strategy) + + # Create mock message - should be dropped due to full queue + msg = create_mock_message("test-queue", {"data": "test"}) + + # Process message (should be dropped due to full queue + drop_new strategy) await subscriber._process_message(msg) - - # Should negative acknowledge failed delivery - mock_consumer.negative_acknowledge.assert_called_once_with(msg) - mock_consumer.acknowledge.assert_not_called() - + + # Should acknowledge even though delivery failed - prevents redelivery storm + mock_consumer.acknowledge.assert_called_once_with(msg) + mock_consumer.negative_acknowledge.assert_not_called() + + # Clean up + await subscriber.stop() + + +@pytest.mark.asyncio +async def test_subscriber_orphaned_message_acks(): + """Verify Subscriber acks orphaned messages (no matching waiter). + + On shared response topics, if a message arrives for a waiter that + no longer exists (e.g., client disconnected, request timed out), + we must acknowledge it to prevent redelivery storms. + """ + mock_backend = MagicMock() + mock_consumer = MagicMock() + mock_backend.create_consumer.return_value = mock_consumer + + subscriber = Subscriber( + backend=mock_backend, + topic="test-topic", + subscription="test-subscription", + consumer_name="test-consumer", + schema=dict, + max_size=10, + backpressure_strategy="block" + ) + + # Start subscriber to initialize consumer + await subscriber.start() + + # Don't create any queues - message will be orphaned + # This simulates a response arriving after the waiter has unsubscribed + + # Create mock message with an ID that has no matching waiter + msg = create_mock_message("non-existent-waiter-id", {"data": "orphaned"}) + + # Process message (should be orphaned - no matching waiter) + await subscriber._process_message(msg) + + # Should acknowledge orphaned message - prevents redelivery storm + mock_consumer.acknowledge.assert_called_once_with(msg) + mock_consumer.negative_acknowledge.assert_not_called() + # Clean up await subscriber.stop() diff --git a/trustgraph-base/trustgraph/base/subscriber.py b/trustgraph-base/trustgraph/base/subscriber.py index d59bcab3..b0d90507 100644 --- a/trustgraph-base/trustgraph/base/subscriber.py +++ b/trustgraph-base/trustgraph/base/subscriber.py @@ -222,35 +222,50 @@ class Subscriber: # Store message for later acknowledgment msg_id = str(uuid.uuid4()) self.pending_acks[msg_id] = msg - + try: id = msg.properties()["id"] except: id = None - + value = msg.value() delivery_success = False - + has_matching_waiter = False + async with self.lock: # Deliver to specific subscribers if id in self.q: + has_matching_waiter = True delivery_success = await self._deliver_to_queue( self.q[id], value ) - + # Deliver to all subscribers for q in self.full.values(): + has_matching_waiter = True if await self._deliver_to_queue(q, value): delivery_success = True - - # Acknowledge only on successful delivery - if delivery_success: - self.consumer.acknowledge(msg) - del self.pending_acks[msg_id] - else: - # Negative acknowledge for retry - self.consumer.negative_acknowledge(msg) - del self.pending_acks[msg_id] + + # Always acknowledge the message to prevent redelivery storms + # on shared topics. Negative acknowledging orphaned messages + # (no matching waiter) causes immediate redelivery to all + # subscribers, none of whom can handle it either. + self.consumer.acknowledge(msg) + del self.pending_acks[msg_id] + + if not delivery_success: + if not has_matching_waiter: + # Message arrived for a waiter that no longer exists + # (likely due to client disconnect or timeout) + logger.debug( + f"Discarding orphaned message with id={id} - " + "no matching waiter" + ) + else: + # Delivery failed (e.g., queue full with drop_new strategy) + logger.debug( + f"Message with id={id} dropped due to backpressure" + ) async def _deliver_to_queue(self, queue, value): """Deliver message to queue with backpressure handling"""