From 47dfc30c1cc0f6c1895ebafca1200408bc526c0c Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Mon, 18 May 2026 22:08:52 +0100 Subject: [PATCH] fix: suppress Pulsar C++ client log noise (#936) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Revert consumer receive timeout from 100ms back to the original 2000ms. The 100ms change was based on a misunderstanding — receive() is a blocking call that returns immediately when a message arrives, so the timeout only affects how quickly a consumer checks the shutdown flag during idle periods. 100ms generated ~200 WARN lines/sec from the C++ client with no latency benefit. Also set the Pulsar C++ client logger to Error level so residual timeout warnings from the subscriber (250ms) don't produce noise. Update poll timeout test to match reverted 2000ms value --- .../test_consumer_concurrency.py | 17 ++++++++--------- trustgraph-base/trustgraph/base/consumer.py | 2 +- .../trustgraph/base/pulsar_backend.py | 4 ++++ 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/tests/unit/test_concurrency/test_consumer_concurrency.py b/tests/unit/test_concurrency/test_consumer_concurrency.py index 59c7f2b5..44d82182 100644 --- a/tests/unit/test_concurrency/test_consumer_concurrency.py +++ b/tests/unit/test_concurrency/test_consumer_concurrency.py @@ -272,23 +272,22 @@ class TestMetricsIntegration: class TestPollTimeout: @pytest.mark.asyncio - async def test_poll_timeout_is_100ms(self): - """Consumer receive timeout should be 100ms, not the original 2000ms. + async def test_poll_timeout_is_2000ms(self): + """Consumer receive timeout should be 2000ms. - A 2000ms poll timeout means every service adds up to 2s of idle - blocking between message bursts. With many sequential hops in a - query pipeline, this compounds into seconds of unnecessary latency. - 100ms keeps responsiveness high without significant CPU overhead. + receive() is a blocking call that returns immediately when a + message arrives — the timeout only governs how often the loop + checks the shutdown flag during idle periods. Lower values + (e.g. 100ms) generate excessive C++ client WARN logging with + no latency benefit. """ consumer = _make_consumer() - # Wire up a mock Pulsar consumer that records the receive kwargs mock_pulsar_consumer = MagicMock() received_kwargs = {} def capture_receive(**kwargs): received_kwargs.update(kwargs) - # Stop after one call consumer.running = False raise type('Timeout', (Exception,), {})("timeout") @@ -296,7 +295,7 @@ class TestPollTimeout: await consumer.consume_from_queue(mock_pulsar_consumer) - assert received_kwargs.get("timeout_millis") == 100 + assert received_kwargs.get("timeout_millis") == 2000 # --------------------------------------------------------------------------- diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index 5c59c515..86cc4ceb 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -188,7 +188,7 @@ class Consumer: try: msg = await loop.run_in_executor( executor, - lambda: consumer.receive(timeout_millis=100), + lambda: consumer.receive(timeout_millis=2000), ) except Exception as e: # Handle timeout from any backend diff --git a/trustgraph-base/trustgraph/base/pulsar_backend.py b/trustgraph-base/trustgraph/base/pulsar_backend.py index e27d16af..dc5e4083 100644 --- a/trustgraph-base/trustgraph/base/pulsar_backend.py +++ b/trustgraph-base/trustgraph/base/pulsar_backend.py @@ -139,6 +139,10 @@ class PulsarBackend: if api_key: client_args['authentication'] = pulsar.AuthenticationToken(api_key) + client_args['logger'] = pulsar.ConsoleLogger( + _pulsar.LoggerLevel.Error + ) + self.client = pulsar.Client(**client_args) logger.info(f"Pulsar client connected to {host}")