diff --git a/tests/unit/test_base/test_subscriber_graceful_shutdown.py b/tests/unit/test_base/test_subscriber_graceful_shutdown.py index ec14f66b..1a59f970 100644 --- a/tests/unit/test_base/test_subscriber_graceful_shutdown.py +++ b/tests/unit/test_base/test_subscriber_graceful_shutdown.py @@ -236,6 +236,10 @@ async def test_subscriber_graceful_shutdown(): with patch.object(subscriber, 'run') as mock_run: # Mock run that simulates graceful shutdown async def mock_run_graceful(): + # Honor the readiness contract: real run() signals _ready + # after binding the consumer, so start() can unblock. Mocks + # of run() must do the same or start() hangs forever. + subscriber._ready.set_result(None) # Process messages while running, then drain while subscriber.running or subscriber.draining: if subscriber.draining: @@ -337,6 +341,8 @@ async def test_subscriber_pending_acks_cleanup(): with patch.object(subscriber, 'run') as mock_run: # Mock run that simulates cleanup of pending acks async def mock_run_cleanup(): + # Honor the readiness contract — see test_subscriber_graceful_shutdown. + subscriber._ready.set_result(None) while subscriber.running or subscriber.draining: await asyncio.sleep(0.05) if subscriber.draining: diff --git a/tests/unit/test_base/test_subscriber_readiness.py b/tests/unit/test_base/test_subscriber_readiness.py new file mode 100644 index 00000000..e1ef47de --- /dev/null +++ b/tests/unit/test_base/test_subscriber_readiness.py @@ -0,0 +1,189 @@ +""" +Regression tests for Subscriber.start() readiness barrier. + +Background: prior to the eager-connect fix, Subscriber.start() created +the run() task and returned immediately. The underlying backend consumer +was lazily connected on its first receive() call, which left a setup +race for request/response clients using ephemeral per-subscriber response +queues (RabbitMQ auto-delete exclusive queues): the request would be +published before the response queue was bound, and the broker would +silently drop the reply. fetch_config(), document-embeddings, and +api-gateway all hit this with "Failed to fetch config on notify" / +"Request timeout exception" symptoms. + +These tests pin the readiness contract: + + await subscriber.start() + # at this point, consumer.ensure_connected() MUST have run + +so that any future change which removes the eager bind, or moves it +back to lazy initialisation, fails CI loudly. +""" + +import asyncio + +import pytest +from unittest.mock import MagicMock + +from trustgraph.base.subscriber import Subscriber + + +def _make_backend(ensure_connected_side_effect=None, + receive_side_effect=None): + """Build a fake backend whose consumer records ensure_connected / + receive calls. ensure_connected_side_effect lets a test inject a + delay or exception.""" + backend = MagicMock() + consumer = MagicMock() + + consumer.ensure_connected = MagicMock( + side_effect=ensure_connected_side_effect, + ) + + # By default receive raises a timeout-style exception that the + # subscriber loop is supposed to swallow as a "no message yet" — this + # keeps the subscriber idling cleanly while the test inspects state. + if receive_side_effect is None: + receive_side_effect = TimeoutError("No message received within timeout") + consumer.receive = MagicMock(side_effect=receive_side_effect) + + consumer.acknowledge = MagicMock() + consumer.negative_acknowledge = MagicMock() + consumer.pause_message_listener = MagicMock() + consumer.unsubscribe = MagicMock() + consumer.close = MagicMock() + + backend.create_consumer.return_value = consumer + return backend, consumer + + +def _make_subscriber(backend): + return Subscriber( + backend=backend, + topic="response:tg:config", + subscription="test-sub", + consumer_name="test-consumer", + schema=dict, + max_size=10, + drain_timeout=1.0, + backpressure_strategy="block", + ) + + +class TestSubscriberReadiness: + + @pytest.mark.asyncio + async def test_start_calls_ensure_connected_before_returning(self): + """The barrier: ensure_connected must have been invoked at least + once by the time start() returns.""" + backend, consumer = _make_backend() + subscriber = _make_subscriber(backend) + + await subscriber.start() + + try: + consumer.ensure_connected.assert_called_once() + finally: + await subscriber.stop() + + @pytest.mark.asyncio + async def test_start_blocks_until_ensure_connected_completes(self): + """If ensure_connected is slow, start() must wait for it. This is + the actual race-condition guard — it would have failed against + the buggy version where start() returned before run() had even + scheduled the consumer creation.""" + connect_started = asyncio.Event() + release_connect = asyncio.Event() + + # ensure_connected runs in the executor thread, so we need a + # threading-safe gate. Use a simple busy-wait on a flag set by + # the asyncio side via call_soon_threadsafe — but the simpler + # path is to give it a sleep and observe ordering. + import threading + gate = threading.Event() + + def slow_connect(): + connect_started.set() # safe: only mutates the Event flag + gate.wait(timeout=2.0) + + backend, consumer = _make_backend( + ensure_connected_side_effect=slow_connect, + ) + subscriber = _make_subscriber(backend) + + start_task = asyncio.create_task(subscriber.start()) + + # Wait until ensure_connected has begun executing. + await asyncio.wait_for(connect_started.wait(), timeout=2.0) + + # ensure_connected is in flight — start() must NOT have returned. + assert not start_task.done(), ( + "start() returned before ensure_connected() completed — " + "the readiness barrier is broken and the request/response " + "race condition is back." + ) + + # Release the gate; start() should now complete promptly. + gate.set() + await asyncio.wait_for(start_task, timeout=2.0) + + consumer.ensure_connected.assert_called_once() + + await subscriber.stop() + + @pytest.mark.asyncio + async def test_start_propagates_consumer_creation_failure(self): + """If create_consumer() raises, start() must surface the error + rather than hang on the readiness future. The old code path + retried indefinitely inside run() and never let start() unblock.""" + backend = MagicMock() + backend.create_consumer.side_effect = RuntimeError("broker down") + + subscriber = _make_subscriber(backend) + + with pytest.raises(RuntimeError, match="broker down"): + await asyncio.wait_for(subscriber.start(), timeout=2.0) + + @pytest.mark.asyncio + async def test_start_propagates_ensure_connected_failure(self): + """Same contract for an ensure_connected() that raises (e.g. the + broker is up but the queue declare/bind fails).""" + backend, consumer = _make_backend( + ensure_connected_side_effect=RuntimeError("queue declare failed"), + ) + subscriber = _make_subscriber(backend) + + with pytest.raises(RuntimeError, match="queue declare failed"): + await asyncio.wait_for(subscriber.start(), timeout=2.0) + + @pytest.mark.asyncio + async def test_ensure_connected_runs_before_subscriber_running_log(self): + """Subtle ordering: ensure_connected MUST happen before the + receive loop, so that any reply is captured. We assert this by + checking ensure_connected was called before any receive call.""" + call_order = [] + + def record_ensure(): + call_order.append("ensure_connected") + + def record_receive(*args, **kwargs): + call_order.append("receive") + raise TimeoutError("No message received within timeout") + + backend, consumer = _make_backend( + ensure_connected_side_effect=record_ensure, + receive_side_effect=record_receive, + ) + subscriber = _make_subscriber(backend) + + await subscriber.start() + + # Give the receive loop a tick to run at least once. + await asyncio.sleep(0.05) + + await subscriber.stop() + + # ensure_connected must come first; receive may not have happened + # yet on a fast machine, but if it did, it must come after. + assert call_order, "neither ensure_connected nor receive was called" + assert call_order[0] == "ensure_connected" diff --git a/tests/unit/test_chunking/test_recursive_chunker.py b/tests/unit/test_chunking/test_recursive_chunker.py index a5ec59c8..d1a5d247 100644 --- a/tests/unit/test_chunking/test_recursive_chunker.py +++ b/tests/unit/test_chunking/test_recursive_chunker.py @@ -70,11 +70,12 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase): # Mock message and flow mock_message = MagicMock() mock_consumer = MagicMock() + # Flow exposes parameter lookup via __call__: flow("chunk-size") mock_flow = MagicMock() - mock_flow.parameters.get.side_effect = lambda param: { + mock_flow.side_effect = lambda key: { "chunk-size": 2000, # Override chunk size "chunk-overlap": None # Use default chunk overlap - }.get(param) + }.get(key) # Act chunk_size, chunk_overlap = await processor.chunk_document( @@ -105,10 +106,10 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase): mock_message = MagicMock() mock_consumer = MagicMock() mock_flow = MagicMock() - mock_flow.parameters.get.side_effect = lambda param: { + mock_flow.side_effect = lambda key: { "chunk-size": None, # Use default chunk size "chunk-overlap": 200 # Override chunk overlap - }.get(param) + }.get(key) # Act chunk_size, chunk_overlap = await processor.chunk_document( @@ -139,10 +140,10 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase): mock_message = MagicMock() mock_consumer = MagicMock() mock_flow = MagicMock() - mock_flow.parameters.get.side_effect = lambda param: { + mock_flow.side_effect = lambda key: { "chunk-size": 1500, # Override chunk size "chunk-overlap": 150 # Override chunk overlap - }.get(param) + }.get(key) # Act chunk_size, chunk_overlap = await processor.chunk_document( @@ -195,15 +196,15 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase): mock_consumer = MagicMock() mock_producer = AsyncMock() mock_triples_producer = AsyncMock() + # Flow.__call__ resolves parameters and producers/consumers from the + # same dict — merge both kinds here. mock_flow = MagicMock() - mock_flow.parameters.get.side_effect = lambda param: { + mock_flow.side_effect = lambda key: { "chunk-size": 1500, "chunk-overlap": 150, - }.get(param) - mock_flow.side_effect = lambda name: { "output": mock_producer, "triples": mock_triples_producer, - }.get(name) + }.get(key) # Act await processor.on_message(mock_message, mock_consumer, mock_flow) @@ -241,7 +242,7 @@ class TestRecursiveChunkerSimple(IsolatedAsyncioTestCase): mock_message = MagicMock() mock_consumer = MagicMock() mock_flow = MagicMock() - mock_flow.parameters.get.return_value = None # No overrides + mock_flow.side_effect = lambda key: None # No overrides # Act chunk_size, chunk_overlap = await processor.chunk_document( diff --git a/tests/unit/test_chunking/test_token_chunker.py b/tests/unit/test_chunking/test_token_chunker.py index f3f83904..dba4ca94 100644 --- a/tests/unit/test_chunking/test_token_chunker.py +++ b/tests/unit/test_chunking/test_token_chunker.py @@ -70,11 +70,12 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase): # Mock message and flow mock_message = MagicMock() mock_consumer = MagicMock() + # Flow exposes parameter lookup via __call__: flow("chunk-size") mock_flow = MagicMock() - mock_flow.parameters.get.side_effect = lambda param: { + mock_flow.side_effect = lambda key: { "chunk-size": 400, # Override chunk size "chunk-overlap": None # Use default chunk overlap - }.get(param) + }.get(key) # Act chunk_size, chunk_overlap = await processor.chunk_document( @@ -105,10 +106,10 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase): mock_message = MagicMock() mock_consumer = MagicMock() mock_flow = MagicMock() - mock_flow.parameters.get.side_effect = lambda param: { + mock_flow.side_effect = lambda key: { "chunk-size": None, # Use default chunk size "chunk-overlap": 25 # Override chunk overlap - }.get(param) + }.get(key) # Act chunk_size, chunk_overlap = await processor.chunk_document( @@ -139,10 +140,10 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase): mock_message = MagicMock() mock_consumer = MagicMock() mock_flow = MagicMock() - mock_flow.parameters.get.side_effect = lambda param: { + mock_flow.side_effect = lambda key: { "chunk-size": 350, # Override chunk size "chunk-overlap": 30 # Override chunk overlap - }.get(param) + }.get(key) # Act chunk_size, chunk_overlap = await processor.chunk_document( @@ -195,15 +196,15 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase): mock_consumer = MagicMock() mock_producer = AsyncMock() mock_triples_producer = AsyncMock() + # Flow.__call__ resolves parameters and producers/consumers from the + # same dict — merge both kinds here. mock_flow = MagicMock() - mock_flow.parameters.get.side_effect = lambda param: { + mock_flow.side_effect = lambda key: { "chunk-size": 400, "chunk-overlap": 40, - }.get(param) - mock_flow.side_effect = lambda name: { "output": mock_producer, "triples": mock_triples_producer, - }.get(name) + }.get(key) # Act await processor.on_message(mock_message, mock_consumer, mock_flow) @@ -245,7 +246,7 @@ class TestTokenChunkerSimple(IsolatedAsyncioTestCase): mock_message = MagicMock() mock_consumer = MagicMock() mock_flow = MagicMock() - mock_flow.parameters.get.return_value = None # No overrides + mock_flow.side_effect = lambda key: None # No overrides # Act chunk_size, chunk_overlap = await processor.chunk_document( diff --git a/trustgraph-base/trustgraph/base/backend.py b/trustgraph-base/trustgraph/base/backend.py index 9b9a42af..f0d6b397 100644 --- a/trustgraph-base/trustgraph/base/backend.py +++ b/trustgraph-base/trustgraph/base/backend.py @@ -58,6 +58,18 @@ class BackendProducer(Protocol): class BackendConsumer(Protocol): """Protocol for backend-specific consumer.""" + def ensure_connected(self) -> None: + """ + Eagerly establish the underlying connection and bind the queue. + + Backends that lazily connect on first receive() must implement this + so that callers can guarantee the consumer is fully bound — and + therefore able to receive responses — before any related request is + published. Backends that connect at construction time may make this + a no-op. + """ + ... + def receive(self, timeout_millis: int = 2000) -> Message: """ Receive a message from the topic. diff --git a/trustgraph-base/trustgraph/base/chunking_service.py b/trustgraph-base/trustgraph/base/chunking_service.py index d4bf4cd4..4bd78428 100644 --- a/trustgraph-base/trustgraph/base/chunking_service.py +++ b/trustgraph-base/trustgraph/base/chunking_service.py @@ -88,14 +88,14 @@ class ChunkingService(FlowProcessor): chunk_overlap = default_chunk_overlap try: - cs = flow.parameters.get("chunk-size") + cs = flow("chunk-size") if cs is not None: chunk_size = int(cs) except Exception as e: logger.warning(f"Could not parse chunk-size parameter: {e}") try: - co = flow.parameters.get("chunk-overlap") + co = flow("chunk-overlap") if co is not None: chunk_overlap = int(co) except Exception as e: diff --git a/trustgraph-base/trustgraph/base/pulsar_backend.py b/trustgraph-base/trustgraph/base/pulsar_backend.py index a567191e..6f125399 100644 --- a/trustgraph-base/trustgraph/base/pulsar_backend.py +++ b/trustgraph-base/trustgraph/base/pulsar_backend.py @@ -72,6 +72,16 @@ class PulsarBackendConsumer: self._consumer = pulsar_consumer self._schema_cls = schema_cls + def ensure_connected(self) -> None: + """No-op for Pulsar. + + PulsarBackend.create_consumer() calls client.subscribe() which is + synchronous and returns a fully-subscribed consumer, so the + consumer is already ready by the time this object is constructed. + Defined for parity with the BackendConsumer protocol used by + Subscriber.start()'s readiness barrier.""" + pass + def receive(self, timeout_millis: int = 2000) -> Message: """Receive a message. Raises TimeoutError if no message available.""" try: diff --git a/trustgraph-base/trustgraph/base/rabbitmq_backend.py b/trustgraph-base/trustgraph/base/rabbitmq_backend.py index 3fafcead..3d82185e 100644 --- a/trustgraph-base/trustgraph/base/rabbitmq_backend.py +++ b/trustgraph-base/trustgraph/base/rabbitmq_backend.py @@ -214,6 +214,18 @@ class RabbitMQBackendConsumer: and self._channel.is_open ) + def ensure_connected(self) -> None: + """Eagerly declare and bind the queue. + + Without this, the queue is only declared lazily on the first + receive() call. For request/response with ephemeral per-subscriber + response queues that is a race: a request published before the + response queue is bound will have its reply silently dropped by + the broker. Subscriber.start() calls this so callers get a hard + readiness barrier.""" + if not self._is_alive(): + self._connect() + def receive(self, timeout_millis: int = 2000) -> Message: """Receive a message. Raises TimeoutError if none available.""" if not self._is_alive(): diff --git a/trustgraph-base/trustgraph/base/subscriber.py b/trustgraph-base/trustgraph/base/subscriber.py index 8c68e51c..82ffe8e2 100644 --- a/trustgraph-base/trustgraph/base/subscriber.py +++ b/trustgraph-base/trustgraph/base/subscriber.py @@ -41,14 +41,55 @@ class Subscriber: self.consumer = None self.executor = None + # Readiness barrier — completed by run() once the underlying + # backend consumer is fully connected and bound. start() awaits + # this so callers know any subsequently published request will + # have a queue ready to receive its response. Without this, + # ephemeral per-subscriber response queues (RabbitMQ auto-delete + # exclusive queues) would race the request and lose the reply. + # A Future is used (rather than an Event) so that a first-attempt + # connection failure can be propagated to start() as an exception. + self._ready = None # created in start() so we have a running loop + def __del__(self): self.running = False async def start(self): + self._ready = asyncio.get_event_loop().create_future() self.task = asyncio.create_task(self.run()) + # Block until run() signals readiness OR exits. The future + # carries the outcome of the first connect attempt: a value on + # success, an exception on first-attempt failure. If run() exits + # without ever signalling (e.g. cancelled, or a code path bug), + # we surface that as a clear RuntimeError rather than hanging + # forever waiting on the future. + ready_wait = asyncio.ensure_future( + asyncio.shield(self._ready) + ) + try: + await asyncio.wait( + {self.task, ready_wait}, + return_when=asyncio.FIRST_COMPLETED, + ) + finally: + ready_wait.cancel() + + if self._ready.done(): + # Re-raise first-attempt connect failure if any. + self._ready.result() + return + + # run() exited before _ready was settled. Propagate its exception + # if it had one, otherwise raise a generic readiness error. + if self.task.done() and self.task.exception() is not None: + raise self.task.exception() + raise RuntimeError( + "Subscriber.run() exited before signalling readiness" + ) + async def stop(self): """Initiate graceful shutdown with draining""" self.running = False @@ -66,6 +107,7 @@ class Subscriber: async def run(self): """Enhanced run method with integrated draining logic""" + first_attempt = True while self.running or self.draining: if self.metrics: @@ -87,10 +129,27 @@ class Subscriber: ), ) + # Eagerly bind the queue. For backends that connect + # lazily on first receive (RabbitMQ), this is what + # closes the request/response setup race — without + # it the response queue is not bound until later and + # any reply published in the meantime is dropped. + await loop.run_in_executor( + self.executor, + lambda: self.consumer.ensure_connected(), + ) + if self.metrics: self.metrics.state("running") logger.info("Subscriber running...") + + # Signal start() that the consumer is ready. This must + # happen AFTER ensure_connected() above so callers can + # safely publish requests immediately after start() returns. + if first_attempt and not self._ready.done(): + self._ready.set_result(None) + first_attempt = False drain_end_time = None while self.running or self.draining: @@ -162,6 +221,16 @@ class Subscriber: except Exception as e: logger.error(f"Subscriber exception: {e}", exc_info=True) + # First-attempt connection failure: propagate to start() + # so the caller can decide what to do (retry, give up). + # Subsequent failures use the existing retry-with-backoff + # path so a long-lived subscriber survives broker blips. + if first_attempt and not self._ready.done(): + self._ready.set_exception(e) + first_attempt = False + # Falls through into finally for cleanup, then the + # outer return below ends run() so start() unblocks. + finally: # Negative acknowledge any pending messages for msg in self.pending_acks.values(): @@ -193,6 +262,11 @@ class Subscriber: if not self.running and not self.draining: return + # If start() has already returned with an exception there is + # nothing more to do — exit run() rather than busy-retry. + if self._ready.done() and self._ready.exception() is not None: + return + # Sleep before retry await asyncio.sleep(1)