diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index 1b9e5999..d5c67d1b 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -54,6 +54,17 @@ class Consumer: self.running = True self.consumer_task = None + # Kafka topics are created with 1 partition, so multiple + # consumers in the same group causes rebalance storms where + # no consumer can fetch. Cap to the backend's limit. + max_concurrency = getattr(backend, 'max_consumer_concurrency', None) + if max_concurrency is not None and concurrency > max_concurrency: + logger.info( + f"Capping concurrency from {concurrency} to " + f"{max_concurrency} (backend limit)" + ) + concurrency = max_concurrency + self.concurrency = concurrency self.metrics = metrics diff --git a/trustgraph-base/trustgraph/base/kafka_backend.py b/trustgraph-base/trustgraph/base/kafka_backend.py index 8dfe8bfa..cec5f74f 100644 --- a/trustgraph-base/trustgraph/base/kafka_backend.py +++ b/trustgraph-base/trustgraph/base/kafka_backend.py @@ -83,6 +83,7 @@ class KafkaBackendProducer: self._producer = KafkaProducer({ 'bootstrap.servers': bootstrap_servers, 'acks': 'all' if durable else '1', + 'message.max.bytes': 10485760, }) def send(self, message: Any, properties: dict = {}) -> None: @@ -94,13 +95,23 @@ class KafkaBackendProducer: for k, v in properties.items() ] if properties else None + self._delivery_error = None + + def _on_delivery(err, msg): + if err: + self._delivery_error = err + self._producer.produce( topic=self._topic_name, value=json_data, headers=headers, + on_delivery=_on_delivery, ) self._producer.flush() + if self._delivery_error: + raise KafkaException(self._delivery_error) + def flush(self) -> None: self._producer.flush() @@ -126,15 +137,41 @@ class KafkaBackendConsumer: self._consumer = None def _connect(self): + import time + t0 = time.monotonic() + + def _on_assign(consumer, partitions): + elapsed = time.monotonic() - t0 + logger.info( + f"Partition assignment for {self._topic_name}: " + f"{[p.partition for p in partitions]} " + f"after {elapsed:.1f}s" + ) + + def _on_revoke(consumer, partitions): + logger.info( + f"Partition revoke for {self._topic_name}: " + f"{[p.partition for p in partitions]}" + ) + self._consumer = KafkaConsumer({ 'bootstrap.servers': self._bootstrap_servers, 'group.id': self._group_id, 'auto.offset.reset': self._auto_offset_reset, 'enable.auto.commit': False, + 'fetch.message.max.bytes': 10485760, + # Tighten group coordination timeouts for fast + # group join on single-member groups. + 'session.timeout.ms': 6000, + 'heartbeat.interval.ms': 1000, }) - self._consumer.subscribe([self._topic_name]) + self._consumer.subscribe( + [self._topic_name], + on_assign=_on_assign, + on_revoke=_on_revoke, + ) logger.info( - f"Kafka consumer connected: topic={self._topic_name}, " + f"Kafka consumer subscribed: topic={self._topic_name}, " f"group={self._group_id}" ) @@ -151,11 +188,11 @@ class KafkaBackendConsumer: if not self._is_alive(): self._connect() - # Force a partition assignment by polling briefly. - # Without this, the consumer may not be assigned partitions - # until the first real poll(), creating a race where the - # request is sent before assignment completes. - self._consumer.poll(timeout=1.0) + # Kick off group join. With auto.offset.reset=earliest + # on response/notify consumers, any messages published + # before assignment completes will be picked up once + # the consumer starts polling in receive(). + self._consumer.poll(timeout=0.5) def receive(self, timeout_millis: int = 2000) -> Message: """Receive a message. Raises TimeoutError if none available.""" @@ -172,6 +209,8 @@ class KafkaBackendConsumer: error = msg.error() if error.code() == KafkaError._PARTITION_EOF: raise TimeoutError("End of partition reached") + if error.code() == KafkaError.UNKNOWN_TOPIC_OR_PART: + raise TimeoutError("Topic not yet available") raise KafkaException(error) return KafkaMessage(msg, self._schema_cls) @@ -236,6 +275,11 @@ class KafkaBackend: if sasl_password: self._admin_config['sasl.password'] = sasl_password + # Topics are created with 1 partition, so only 1 consumer + # per group can be active. Extra consumers cause rebalance + # storms that block message delivery. + self.max_consumer_concurrency = 1 + logger.info( f"Kafka backend: {bootstrap_servers} " f"protocol={security_protocol}" @@ -270,7 +314,10 @@ class KafkaBackend: f"expected flow, request, response, or notify" ) - topic_name = f"{topicspace}.{cls}.{topic}" + # Replace any remaining colons — flow topics can have + # extra segments (e.g. flow:tg:document-load:default) + # and Kafka rejects colons in topic names. + topic_name = f"{topicspace}.{cls}.{topic}".replace(':', '.') return topic_name, cls, durable @@ -305,8 +352,13 @@ class KafkaBackend: # Per-subscriber: unique group so every instance sees # every message. Filter by correlation ID happens at # the Subscriber layer above. + # Use 'earliest' so that responses published before + # partition assignment completes are not missed. + # Each group is unique (UUID) with no committed offsets, + # so 'earliest' reads from the start of the topic. + # The correlation ID filter discards non-matching messages. group_id = f"{subscription}-{uuid.uuid4()}" - auto_offset_reset = 'latest' + auto_offset_reset = 'earliest' else: # Shared: named group, competing consumers group_id = subscription