From f0c9039b7606823132b3b23ed11190702e3da6ec Mon Sep 17 00:00:00 2001 From: Sreeram Venkatasubramanian Date: Tue, 7 Apr 2026 14:19:59 +0530 Subject: [PATCH] fix: reduce consumer poll timeout from 2000ms to 100ms --- .../test_consumer_concurrency.py | 35 +++++++++++++++++++ trustgraph-base/trustgraph/base/consumer.py | 2 +- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_concurrency/test_consumer_concurrency.py b/tests/unit/test_concurrency/test_consumer_concurrency.py index 3869aaf3..03244b73 100644 --- a/tests/unit/test_concurrency/test_consumer_concurrency.py +++ b/tests/unit/test_concurrency/test_consumer_concurrency.py @@ -266,6 +266,41 @@ class TestMetricsIntegration: mock_metrics.rate_limit.assert_called_once() +# --------------------------------------------------------------------------- +# Poll timeout +# --------------------------------------------------------------------------- + +class TestPollTimeout: + + @pytest.mark.asyncio + async def test_poll_timeout_is_100ms(self): + """Consumer receive timeout should be 100ms, not the original 2000ms. + + A 2000ms poll timeout means every service adds up to 2s of idle + blocking between message bursts. With many sequential hops in a + query pipeline, this compounds into seconds of unnecessary latency. + 100ms keeps responsiveness high without significant CPU overhead. + """ + consumer = _make_consumer() + + # Wire up a mock Pulsar consumer that records the receive kwargs + mock_pulsar_consumer = MagicMock() + received_kwargs = {} + + def capture_receive(**kwargs): + received_kwargs.update(kwargs) + # Stop after one call + consumer.running = False + raise type('Timeout', (Exception,), {})("timeout") + + mock_pulsar_consumer.receive = capture_receive + consumer.consumer = mock_pulsar_consumer + + await consumer.consume_from_queue() + + assert received_kwargs.get("timeout_millis") == 100 + + # --------------------------------------------------------------------------- # Stop / running flag # --------------------------------------------------------------------------- diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index 9ae35d49..4f8c9de5 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -165,7 +165,7 @@ class Consumer: try: msg = await asyncio.to_thread( consumer.receive, - timeout_millis=2000 + timeout_millis=100 ) except Exception as e: # Handle timeout from any backend