release/v2.3 -> master (#837)

This commit is contained in:
cybermaggedon 2026-04-21 16:30:02 +01:00 committed by GitHub
parent 222537c26b
commit a24df8e990
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
431 changed files with 244 additions and 49 deletions

View file

@ -54,6 +54,17 @@ class Consumer:
self.running = True
self.consumer_task = None
# Kafka topics are created with 1 partition, so multiple
# consumers in the same group causes rebalance storms where
# no consumer can fetch. Cap to the backend's limit.
max_concurrency = getattr(backend, 'max_consumer_concurrency', None)
if isinstance(max_concurrency, int) and concurrency > max_concurrency:
logger.info(
f"Capping concurrency from {concurrency} to "
f"{max_concurrency} (backend limit)"
)
concurrency = max_concurrency
self.concurrency = concurrency
self.metrics = metrics

View file

@ -83,6 +83,7 @@ class KafkaBackendProducer:
self._producer = KafkaProducer({
'bootstrap.servers': bootstrap_servers,
'acks': 'all' if durable else '1',
'message.max.bytes': 10485760,
})
def send(self, message: Any, properties: dict = {}) -> None:
@ -94,13 +95,23 @@ class KafkaBackendProducer:
for k, v in properties.items()
] if properties else None
self._delivery_error = None
def _on_delivery(err, msg):
if err:
self._delivery_error = err
self._producer.produce(
topic=self._topic_name,
value=json_data,
headers=headers,
on_delivery=_on_delivery,
)
self._producer.flush()
if self._delivery_error:
raise KafkaException(self._delivery_error)
def flush(self) -> None:
self._producer.flush()
@ -126,15 +137,41 @@ class KafkaBackendConsumer:
self._consumer = None
def _connect(self):
import time
t0 = time.monotonic()
def _on_assign(consumer, partitions):
elapsed = time.monotonic() - t0
logger.info(
f"Partition assignment for {self._topic_name}: "
f"{[p.partition for p in partitions]} "
f"after {elapsed:.1f}s"
)
def _on_revoke(consumer, partitions):
logger.info(
f"Partition revoke for {self._topic_name}: "
f"{[p.partition for p in partitions]}"
)
self._consumer = KafkaConsumer({
'bootstrap.servers': self._bootstrap_servers,
'group.id': self._group_id,
'auto.offset.reset': self._auto_offset_reset,
'enable.auto.commit': False,
'fetch.message.max.bytes': 10485760,
# Tighten group coordination timeouts for fast
# group join on single-member groups.
'session.timeout.ms': 6000,
'heartbeat.interval.ms': 1000,
})
self._consumer.subscribe([self._topic_name])
self._consumer.subscribe(
[self._topic_name],
on_assign=_on_assign,
on_revoke=_on_revoke,
)
logger.info(
f"Kafka consumer connected: topic={self._topic_name}, "
f"Kafka consumer subscribed: topic={self._topic_name}, "
f"group={self._group_id}"
)
@ -151,11 +188,11 @@ class KafkaBackendConsumer:
if not self._is_alive():
self._connect()
# Force a partition assignment by polling briefly.
# Without this, the consumer may not be assigned partitions
# until the first real poll(), creating a race where the
# request is sent before assignment completes.
self._consumer.poll(timeout=1.0)
# Kick off group join. With auto.offset.reset=earliest
# on response/notify consumers, any messages published
# before assignment completes will be picked up once
# the consumer starts polling in receive().
self._consumer.poll(timeout=0.5)
def receive(self, timeout_millis: int = 2000) -> Message:
"""Receive a message. Raises TimeoutError if none available."""
@ -172,6 +209,8 @@ class KafkaBackendConsumer:
error = msg.error()
if error.code() == KafkaError._PARTITION_EOF:
raise TimeoutError("End of partition reached")
if error.code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
raise TimeoutError("Topic not yet available")
raise KafkaException(error)
return KafkaMessage(msg, self._schema_cls)
@ -236,6 +275,11 @@ class KafkaBackend:
if sasl_password:
self._admin_config['sasl.password'] = sasl_password
# Topics are created with 1 partition, so only 1 consumer
# per group can be active. Extra consumers cause rebalance
# storms that block message delivery.
self.max_consumer_concurrency = 1
logger.info(
f"Kafka backend: {bootstrap_servers} "
f"protocol={security_protocol}"
@ -270,7 +314,10 @@ class KafkaBackend:
f"expected flow, request, response, or notify"
)
topic_name = f"{topicspace}.{cls}.{topic}"
# Replace any remaining colons — flow topics can have
# extra segments (e.g. flow:tg:document-load:default)
# and Kafka rejects colons in topic names.
topic_name = f"{topicspace}.{cls}.{topic}".replace(':', '.')
return topic_name, cls, durable
@ -305,8 +352,13 @@ class KafkaBackend:
# Per-subscriber: unique group so every instance sees
# every message. Filter by correlation ID happens at
# the Subscriber layer above.
# Use 'earliest' so that responses published before
# partition assignment completes are not missed.
# Each group is unique (UUID) with no committed offsets,
# so 'earliest' reads from the start of the topic.
# The correlation ID filter discards non-matching messages.
group_id = f"{subscription}-{uuid.uuid4()}"
auto_offset_reset = 'latest'
auto_offset_reset = 'earliest'
else:
# Shared: named group, competing consumers
group_id = subscription