mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-25 00:16:23 +02:00
fix: resolve multiple Kafka backend issues blocking message delivery (#833)
- Producer: add delivery callback to surface send errors instead of silently swallowing them, and raise message.max.bytes to 10MB - Consumer: raise fetch.message.max.bytes to 10MB to match producer, tighten session/heartbeat timeouts for fast group joins, and add partition assign/revoke logging for diagnostics - Topic naming: replace colons with dots in topic names since Kafka rejects colons (flow:tg:document-load:default was producing invalid topic name tg.flow.document-load:default) - Response consumers: use auto.offset.reset=earliest instead of latest so responses published before partition assignment aren't lost - UNKNOWN_TOPIC_OR_PART: treat as timeout instead of fatal error so consumers wait for auto-created topics instead of crashing - Concurrency: cap consumer workers to 1 for Kafka since topics have 1 partition — extra consumers trigger rebalance storms that block all message delivery
This commit is contained in:
parent
adea976203
commit
b615150624
2 changed files with 72 additions and 9 deletions
|
|
@ -54,6 +54,17 @@ class Consumer:
|
||||||
self.running = True
|
self.running = True
|
||||||
self.consumer_task = None
|
self.consumer_task = None
|
||||||
|
|
||||||
|
# Kafka topics are created with 1 partition, so multiple
|
||||||
|
# consumers in the same group causes rebalance storms where
|
||||||
|
# no consumer can fetch. Cap to the backend's limit.
|
||||||
|
max_concurrency = getattr(backend, 'max_consumer_concurrency', None)
|
||||||
|
if max_concurrency is not None and concurrency > max_concurrency:
|
||||||
|
logger.info(
|
||||||
|
f"Capping concurrency from {concurrency} to "
|
||||||
|
f"{max_concurrency} (backend limit)"
|
||||||
|
)
|
||||||
|
concurrency = max_concurrency
|
||||||
|
|
||||||
self.concurrency = concurrency
|
self.concurrency = concurrency
|
||||||
|
|
||||||
self.metrics = metrics
|
self.metrics = metrics
|
||||||
|
|
|
||||||
|
|
@ -83,6 +83,7 @@ class KafkaBackendProducer:
|
||||||
self._producer = KafkaProducer({
|
self._producer = KafkaProducer({
|
||||||
'bootstrap.servers': bootstrap_servers,
|
'bootstrap.servers': bootstrap_servers,
|
||||||
'acks': 'all' if durable else '1',
|
'acks': 'all' if durable else '1',
|
||||||
|
'message.max.bytes': 10485760,
|
||||||
})
|
})
|
||||||
|
|
||||||
def send(self, message: Any, properties: dict = {}) -> None:
|
def send(self, message: Any, properties: dict = {}) -> None:
|
||||||
|
|
@ -94,13 +95,23 @@ class KafkaBackendProducer:
|
||||||
for k, v in properties.items()
|
for k, v in properties.items()
|
||||||
] if properties else None
|
] if properties else None
|
||||||
|
|
||||||
|
self._delivery_error = None
|
||||||
|
|
||||||
|
def _on_delivery(err, msg):
|
||||||
|
if err:
|
||||||
|
self._delivery_error = err
|
||||||
|
|
||||||
self._producer.produce(
|
self._producer.produce(
|
||||||
topic=self._topic_name,
|
topic=self._topic_name,
|
||||||
value=json_data,
|
value=json_data,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
|
on_delivery=_on_delivery,
|
||||||
)
|
)
|
||||||
self._producer.flush()
|
self._producer.flush()
|
||||||
|
|
||||||
|
if self._delivery_error:
|
||||||
|
raise KafkaException(self._delivery_error)
|
||||||
|
|
||||||
def flush(self) -> None:
|
def flush(self) -> None:
|
||||||
self._producer.flush()
|
self._producer.flush()
|
||||||
|
|
||||||
|
|
@ -126,15 +137,41 @@ class KafkaBackendConsumer:
|
||||||
self._consumer = None
|
self._consumer = None
|
||||||
|
|
||||||
def _connect(self):
|
def _connect(self):
|
||||||
|
import time
|
||||||
|
t0 = time.monotonic()
|
||||||
|
|
||||||
|
def _on_assign(consumer, partitions):
|
||||||
|
elapsed = time.monotonic() - t0
|
||||||
|
logger.info(
|
||||||
|
f"Partition assignment for {self._topic_name}: "
|
||||||
|
f"{[p.partition for p in partitions]} "
|
||||||
|
f"after {elapsed:.1f}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _on_revoke(consumer, partitions):
|
||||||
|
logger.info(
|
||||||
|
f"Partition revoke for {self._topic_name}: "
|
||||||
|
f"{[p.partition for p in partitions]}"
|
||||||
|
)
|
||||||
|
|
||||||
self._consumer = KafkaConsumer({
|
self._consumer = KafkaConsumer({
|
||||||
'bootstrap.servers': self._bootstrap_servers,
|
'bootstrap.servers': self._bootstrap_servers,
|
||||||
'group.id': self._group_id,
|
'group.id': self._group_id,
|
||||||
'auto.offset.reset': self._auto_offset_reset,
|
'auto.offset.reset': self._auto_offset_reset,
|
||||||
'enable.auto.commit': False,
|
'enable.auto.commit': False,
|
||||||
|
'fetch.message.max.bytes': 10485760,
|
||||||
|
# Tighten group coordination timeouts for fast
|
||||||
|
# group join on single-member groups.
|
||||||
|
'session.timeout.ms': 6000,
|
||||||
|
'heartbeat.interval.ms': 1000,
|
||||||
})
|
})
|
||||||
self._consumer.subscribe([self._topic_name])
|
self._consumer.subscribe(
|
||||||
|
[self._topic_name],
|
||||||
|
on_assign=_on_assign,
|
||||||
|
on_revoke=_on_revoke,
|
||||||
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Kafka consumer connected: topic={self._topic_name}, "
|
f"Kafka consumer subscribed: topic={self._topic_name}, "
|
||||||
f"group={self._group_id}"
|
f"group={self._group_id}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -151,11 +188,11 @@ class KafkaBackendConsumer:
|
||||||
if not self._is_alive():
|
if not self._is_alive():
|
||||||
self._connect()
|
self._connect()
|
||||||
|
|
||||||
# Force a partition assignment by polling briefly.
|
# Kick off group join. With auto.offset.reset=earliest
|
||||||
# Without this, the consumer may not be assigned partitions
|
# on response/notify consumers, any messages published
|
||||||
# until the first real poll(), creating a race where the
|
# before assignment completes will be picked up once
|
||||||
# request is sent before assignment completes.
|
# the consumer starts polling in receive().
|
||||||
self._consumer.poll(timeout=1.0)
|
self._consumer.poll(timeout=0.5)
|
||||||
|
|
||||||
def receive(self, timeout_millis: int = 2000) -> Message:
|
def receive(self, timeout_millis: int = 2000) -> Message:
|
||||||
"""Receive a message. Raises TimeoutError if none available."""
|
"""Receive a message. Raises TimeoutError if none available."""
|
||||||
|
|
@ -172,6 +209,8 @@ class KafkaBackendConsumer:
|
||||||
error = msg.error()
|
error = msg.error()
|
||||||
if error.code() == KafkaError._PARTITION_EOF:
|
if error.code() == KafkaError._PARTITION_EOF:
|
||||||
raise TimeoutError("End of partition reached")
|
raise TimeoutError("End of partition reached")
|
||||||
|
if error.code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
|
||||||
|
raise TimeoutError("Topic not yet available")
|
||||||
raise KafkaException(error)
|
raise KafkaException(error)
|
||||||
|
|
||||||
return KafkaMessage(msg, self._schema_cls)
|
return KafkaMessage(msg, self._schema_cls)
|
||||||
|
|
@ -236,6 +275,11 @@ class KafkaBackend:
|
||||||
if sasl_password:
|
if sasl_password:
|
||||||
self._admin_config['sasl.password'] = sasl_password
|
self._admin_config['sasl.password'] = sasl_password
|
||||||
|
|
||||||
|
# Topics are created with 1 partition, so only 1 consumer
|
||||||
|
# per group can be active. Extra consumers cause rebalance
|
||||||
|
# storms that block message delivery.
|
||||||
|
self.max_consumer_concurrency = 1
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Kafka backend: {bootstrap_servers} "
|
f"Kafka backend: {bootstrap_servers} "
|
||||||
f"protocol={security_protocol}"
|
f"protocol={security_protocol}"
|
||||||
|
|
@ -270,7 +314,10 @@ class KafkaBackend:
|
||||||
f"expected flow, request, response, or notify"
|
f"expected flow, request, response, or notify"
|
||||||
)
|
)
|
||||||
|
|
||||||
topic_name = f"{topicspace}.{cls}.{topic}"
|
# Replace any remaining colons — flow topics can have
|
||||||
|
# extra segments (e.g. flow:tg:document-load:default)
|
||||||
|
# and Kafka rejects colons in topic names.
|
||||||
|
topic_name = f"{topicspace}.{cls}.{topic}".replace(':', '.')
|
||||||
|
|
||||||
return topic_name, cls, durable
|
return topic_name, cls, durable
|
||||||
|
|
||||||
|
|
@ -305,8 +352,13 @@ class KafkaBackend:
|
||||||
# Per-subscriber: unique group so every instance sees
|
# Per-subscriber: unique group so every instance sees
|
||||||
# every message. Filter by correlation ID happens at
|
# every message. Filter by correlation ID happens at
|
||||||
# the Subscriber layer above.
|
# the Subscriber layer above.
|
||||||
|
# Use 'earliest' so that responses published before
|
||||||
|
# partition assignment completes are not missed.
|
||||||
|
# Each group is unique (UUID) with no committed offsets,
|
||||||
|
# so 'earliest' reads from the start of the topic.
|
||||||
|
# The correlation ID filter discards non-matching messages.
|
||||||
group_id = f"{subscription}-{uuid.uuid4()}"
|
group_id = f"{subscription}-{uuid.uuid4()}"
|
||||||
auto_offset_reset = 'latest'
|
auto_offset_reset = 'earliest'
|
||||||
else:
|
else:
|
||||||
# Shared: named group, competing consumers
|
# Shared: named group, competing consumers
|
||||||
group_id = subscription
|
group_id = subscription
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue