diff --git a/tests/unit/test_pubsub/test_kafka_backend.py b/tests/unit/test_pubsub/test_kafka_backend.py index 456386f0..d51b1817 100644 --- a/tests/unit/test_pubsub/test_kafka_backend.py +++ b/tests/unit/test_pubsub/test_kafka_backend.py @@ -57,9 +57,9 @@ class TestKafkaParseTopic: backend._parse_topic('unknown:tg:topic') def test_topic_with_flow_suffix(self, backend): - """Topic names with flow suffix (e.g. :default) are preserved.""" + """Topic names with flow suffix (e.g. :default) have colons replaced with dots.""" name, cls, durable = backend._parse_topic('request:tg:prompt:default') - assert name == 'tg.request.prompt:default' + assert name == 'tg.request.prompt.default' class TestKafkaRetention: diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index d5c67d1b..5c59c515 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -58,7 +58,7 @@ class Consumer: # consumers in the same group causes rebalance storms where # no consumer can fetch. Cap to the backend's limit. max_concurrency = getattr(backend, 'max_consumer_concurrency', None) - if max_concurrency is not None and concurrency > max_concurrency: + if isinstance(max_concurrency, int) and concurrency > max_concurrency: logger.info( f"Capping concurrency from {concurrency} to " f"{max_concurrency} (backend limit)"