mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-05-05 13:22:37 +02:00
parent
19a83d9335
commit
6e6e320c47
2 changed files with 89 additions and 27 deletions
|
|
@ -88,8 +88,13 @@ async def test_subscriber_deferred_acknowledgment_success():
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_subscriber_deferred_acknowledgment_failure():
|
async def test_subscriber_dropped_message_still_acks():
|
||||||
"""Verify Subscriber negative acks on delivery failure."""
|
"""Verify Subscriber acks even when message is dropped (backpressure).
|
||||||
|
|
||||||
|
This prevents redelivery storms on shared topics - if we negative_ack
|
||||||
|
a dropped message, it gets redelivered to all subscribers, none of
|
||||||
|
whom can handle it either, causing a tight redelivery loop.
|
||||||
|
"""
|
||||||
mock_backend = MagicMock()
|
mock_backend = MagicMock()
|
||||||
mock_consumer = MagicMock()
|
mock_consumer = MagicMock()
|
||||||
mock_backend.create_consumer.return_value = mock_consumer
|
mock_backend.create_consumer.return_value = mock_consumer
|
||||||
|
|
@ -111,15 +116,57 @@ async def test_subscriber_deferred_acknowledgment_failure():
|
||||||
queue = await subscriber.subscribe("test-queue")
|
queue = await subscriber.subscribe("test-queue")
|
||||||
await queue.put({"existing": "data"})
|
await queue.put({"existing": "data"})
|
||||||
|
|
||||||
# Create mock message - should be dropped
|
# Create mock message - should be dropped due to full queue
|
||||||
msg = create_mock_message("msg-1", {"data": "test"})
|
msg = create_mock_message("test-queue", {"data": "test"})
|
||||||
|
|
||||||
# Process message (should fail due to full queue + drop_new strategy)
|
# Process message (should be dropped due to full queue + drop_new strategy)
|
||||||
await subscriber._process_message(msg)
|
await subscriber._process_message(msg)
|
||||||
|
|
||||||
# Should negative acknowledge failed delivery
|
# Should acknowledge even though delivery failed - prevents redelivery storm
|
||||||
mock_consumer.negative_acknowledge.assert_called_once_with(msg)
|
mock_consumer.acknowledge.assert_called_once_with(msg)
|
||||||
mock_consumer.acknowledge.assert_not_called()
|
mock_consumer.negative_acknowledge.assert_not_called()
|
||||||
|
|
||||||
|
# Clean up
|
||||||
|
await subscriber.stop()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_subscriber_orphaned_message_acks():
|
||||||
|
"""Verify Subscriber acks orphaned messages (no matching waiter).
|
||||||
|
|
||||||
|
On shared response topics, if a message arrives for a waiter that
|
||||||
|
no longer exists (e.g., client disconnected, request timed out),
|
||||||
|
we must acknowledge it to prevent redelivery storms.
|
||||||
|
"""
|
||||||
|
mock_backend = MagicMock()
|
||||||
|
mock_consumer = MagicMock()
|
||||||
|
mock_backend.create_consumer.return_value = mock_consumer
|
||||||
|
|
||||||
|
subscriber = Subscriber(
|
||||||
|
backend=mock_backend,
|
||||||
|
topic="test-topic",
|
||||||
|
subscription="test-subscription",
|
||||||
|
consumer_name="test-consumer",
|
||||||
|
schema=dict,
|
||||||
|
max_size=10,
|
||||||
|
backpressure_strategy="block"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Start subscriber to initialize consumer
|
||||||
|
await subscriber.start()
|
||||||
|
|
||||||
|
# Don't create any queues - message will be orphaned
|
||||||
|
# This simulates a response arriving after the waiter has unsubscribed
|
||||||
|
|
||||||
|
# Create mock message with an ID that has no matching waiter
|
||||||
|
msg = create_mock_message("non-existent-waiter-id", {"data": "orphaned"})
|
||||||
|
|
||||||
|
# Process message (should be orphaned - no matching waiter)
|
||||||
|
await subscriber._process_message(msg)
|
||||||
|
|
||||||
|
# Should acknowledge orphaned message - prevents redelivery storm
|
||||||
|
mock_consumer.acknowledge.assert_called_once_with(msg)
|
||||||
|
mock_consumer.negative_acknowledge.assert_not_called()
|
||||||
|
|
||||||
# Clean up
|
# Clean up
|
||||||
await subscriber.stop()
|
await subscriber.stop()
|
||||||
|
|
|
||||||
|
|
@ -230,27 +230,42 @@ class Subscriber:
|
||||||
|
|
||||||
value = msg.value()
|
value = msg.value()
|
||||||
delivery_success = False
|
delivery_success = False
|
||||||
|
has_matching_waiter = False
|
||||||
|
|
||||||
async with self.lock:
|
async with self.lock:
|
||||||
# Deliver to specific subscribers
|
# Deliver to specific subscribers
|
||||||
if id in self.q:
|
if id in self.q:
|
||||||
|
has_matching_waiter = True
|
||||||
delivery_success = await self._deliver_to_queue(
|
delivery_success = await self._deliver_to_queue(
|
||||||
self.q[id], value
|
self.q[id], value
|
||||||
)
|
)
|
||||||
|
|
||||||
# Deliver to all subscribers
|
# Deliver to all subscribers
|
||||||
for q in self.full.values():
|
for q in self.full.values():
|
||||||
|
has_matching_waiter = True
|
||||||
if await self._deliver_to_queue(q, value):
|
if await self._deliver_to_queue(q, value):
|
||||||
delivery_success = True
|
delivery_success = True
|
||||||
|
|
||||||
# Acknowledge only on successful delivery
|
# Always acknowledge the message to prevent redelivery storms
|
||||||
if delivery_success:
|
# on shared topics. Negative acknowledging orphaned messages
|
||||||
|
# (no matching waiter) causes immediate redelivery to all
|
||||||
|
# subscribers, none of whom can handle it either.
|
||||||
self.consumer.acknowledge(msg)
|
self.consumer.acknowledge(msg)
|
||||||
del self.pending_acks[msg_id]
|
del self.pending_acks[msg_id]
|
||||||
|
|
||||||
|
if not delivery_success:
|
||||||
|
if not has_matching_waiter:
|
||||||
|
# Message arrived for a waiter that no longer exists
|
||||||
|
# (likely due to client disconnect or timeout)
|
||||||
|
logger.debug(
|
||||||
|
f"Discarding orphaned message with id={id} - "
|
||||||
|
"no matching waiter"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# Negative acknowledge for retry
|
# Delivery failed (e.g., queue full with drop_new strategy)
|
||||||
self.consumer.negative_acknowledge(msg)
|
logger.debug(
|
||||||
del self.pending_acks[msg_id]
|
f"Message with id={id} dropped due to backpressure"
|
||||||
|
)
|
||||||
|
|
||||||
async def _deliver_to_queue(self, queue, value):
|
async def _deliver_to_queue(self, queue, value):
|
||||||
"""Deliver message to queue with backpressure handling"""
|
"""Deliver message to queue with backpressure handling"""
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue