diff --git a/tests/unit/test_concurrency/test_consumer_concurrency.py b/tests/unit/test_concurrency/test_consumer_concurrency.py index 59c7f2b5..44d82182 100644 --- a/tests/unit/test_concurrency/test_consumer_concurrency.py +++ b/tests/unit/test_concurrency/test_consumer_concurrency.py @@ -272,23 +272,22 @@ class TestMetricsIntegration: class TestPollTimeout: @pytest.mark.asyncio - async def test_poll_timeout_is_100ms(self): - """Consumer receive timeout should be 100ms, not the original 2000ms. + async def test_poll_timeout_is_2000ms(self): + """Consumer receive timeout should be 2000ms. - A 2000ms poll timeout means every service adds up to 2s of idle - blocking between message bursts. With many sequential hops in a - query pipeline, this compounds into seconds of unnecessary latency. - 100ms keeps responsiveness high without significant CPU overhead. + receive() is a blocking call that returns immediately when a + message arrives — the timeout only governs how often the loop + checks the shutdown flag during idle periods. Lower values + (e.g. 100ms) generate excessive C++ client WARN logging with + no latency benefit. """ consumer = _make_consumer() - # Wire up a mock Pulsar consumer that records the receive kwargs mock_pulsar_consumer = MagicMock() received_kwargs = {} def capture_receive(**kwargs): received_kwargs.update(kwargs) - # Stop after one call consumer.running = False raise type('Timeout', (Exception,), {})("timeout") @@ -296,7 +295,7 @@ class TestPollTimeout: await consumer.consume_from_queue(mock_pulsar_consumer) - assert received_kwargs.get("timeout_millis") == 100 + assert received_kwargs.get("timeout_millis") == 2000 # --------------------------------------------------------------------------- diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index 5c59c515..86cc4ceb 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -188,7 +188,7 @@ class Consumer: try: msg = await loop.run_in_executor( executor, - lambda: consumer.receive(timeout_millis=100), + lambda: consumer.receive(timeout_millis=2000), ) except Exception as e: # Handle timeout from any backend diff --git a/trustgraph-base/trustgraph/base/pulsar_backend.py b/trustgraph-base/trustgraph/base/pulsar_backend.py index e27d16af..dc5e4083 100644 --- a/trustgraph-base/trustgraph/base/pulsar_backend.py +++ b/trustgraph-base/trustgraph/base/pulsar_backend.py @@ -139,6 +139,10 @@ class PulsarBackend: if api_key: client_args['authentication'] = pulsar.AuthenticationToken(api_key) + client_args['logger'] = pulsar.ConsoleLogger( + _pulsar.LoggerLevel.Error + ) + self.client = pulsar.Client(**client_args) logger.info(f"Pulsar client connected to {host}")