diff --git a/docs/tech-specs/kafka-backend.md b/docs/tech-specs/kafka-backend.md new file mode 100644 index 00000000..178ddfe4 --- /dev/null +++ b/docs/tech-specs/kafka-backend.md @@ -0,0 +1,200 @@ +--- +layout: default +title: "Kafka Pub/Sub Backend Technical Specification" +parent: "Tech Specs" +--- + +# Kafka Pub/Sub Backend Technical Specification + +## Overview + +Add Apache Kafka as a third pub/sub backend alongside Pulsar and RabbitMQ. +Kafka's topic model maps naturally to TrustGraph's pub/sub abstraction: +topics are first-class, consumer groups provide competing-consumer +semantics, and the AdminClient handles topic lifecycle. + +## Problem + +TrustGraph currently supports Pulsar and RabbitMQ. Kafka is widely +deployed and operationally familiar to many teams. Its log-based +architecture provides durable, replayable message streams with +well-understood scaling properties. + +## Design + +### Concept Mapping + +| TrustGraph concept | Kafka equivalent | +|---|---| +| Topic (`class:topicspace:topic`) | Kafka topic (named `topicspace.class.topic`) | +| Subscription (competing consumers) | Consumer group | +| `create_topic` / `delete_topic` | `AdminClient.create_topics()` / `delete_topics()` | +| `ensure_topic` | `AdminClient.create_topics()` (idempotent) | +| Producer | `KafkaProducer` | +| Consumer | `KafkaConsumer` in a consumer group | +| Message acknowledge | Commit offset | +| Message negative acknowledge | Seek back to message offset | + +### Topic Naming + +The topic name follows the same convention as the RabbitMQ exchange +name: + +``` +class:topicspace:topic -> topicspace.class.topic +``` + +Examples: +- `flow:tg:text-completion-request` -> `tg.flow.text-completion-request` +- `request:tg:librarian` -> `tg.request.librarian` +- `response:tg:config` -> `tg.response.config` + +### Topic Classes and Retention + +Kafka topics are always durable (log-based). The class prefix determines +retention policy rather than durability: + +| Class | Retention | Partitions | Notes | +|---|---|---|---| +| `flow` | Long or infinite | 1 | Data pipeline, order preserved | +| `request` | Short (e.g. 300s) | 1 | RPC requests, ephemeral | +| `response` | Short (e.g. 300s) | 1 | RPC responses, shared (see below) | +| `notify` | Short (e.g. 300s) | 1 | Broadcast signals | + +Single partition per topic preserves message ordering and makes +offset-based acknowledgment equivalent to per-message ack. This matches +the current `prefetch_count=1` model used across all backends. + +### Producers + +Straightforward `KafkaProducer` wrapping. Messages are serialised as +JSON (consistent with the RabbitMQ backend). Message properties/headers +map to Kafka record headers. + +### Consumers + +#### Flow and Request Class (Competing Consumers) + +Consumer group ID = subscription name. Multiple consumers in the same +group share the workload (Kafka's native consumer group rebalancing). + +``` +group_id = subscription # e.g. "triples-store--default--input" +``` + +#### Response and Notify Class (Per-Subscriber) + +This is where Kafka differs from RabbitMQ. Kafka has no anonymous +exclusive auto-delete queues. + +Design: use a **shared response topic with unique consumer groups**. +Each subscriber gets its own consumer group (using the existing +UUID-based subscription name from `RequestResponseSpec`). Every +subscriber reads all messages from the topic and filters by correlation +ID, discarding non-matching messages. + +This is slightly wasteful — N subscribers each read every response — but +request/response traffic is low-volume compared to the data pipeline. +The alternative (per-instance temporary topics) would require dynamic +topic creation/deletion for every API gateway request, which is +expensive in Kafka (AdminClient operations involve controller +coordination). + +### Acknowledgment + +#### Acknowledge (Success) + +Commit the message's offset. With a single partition and sequential +processing, this is equivalent to per-message ack: + +``` +consumer.commit(offsets={partition: offset + 1}) +``` + +#### Negative Acknowledge (Failure / Retry) + +Kafka has no native nack-with-redelivery. On processing failure, seek +the consumer back to the failed message's offset: + +``` +consumer.seek(partition, offset) +``` + +The message is redelivered on the next poll. This matches the current +RabbitMQ `basic_nack(requeue=True)` behaviour: the message is retried +by the same consumer. + +### Topic Lifecycle + +The flow service creates and deletes topics via the Kafka AdminClient: + +- **Flow start**: `AdminClient.create_topics()` for each unique topic + in the blueprint. Topic config includes `retention.ms` based on class. +- **Flow stop**: `AdminClient.delete_topics()` for the flow's topics. +- **Service startup**: `ensure_topic` creates the topic if it doesn't + exist (idempotent via `create_topics` with `validate_only=False`). + +Unlike RabbitMQ where consumers declare their own queues, Kafka topics +must exist before consumers connect. The flow service and service +startup `ensure_topic` calls handle this. + +### Message Encoding + +JSON body, consistent with the RabbitMQ backend. Serialisation uses the +existing `dataclass_to_dict` / `dict_to_dataclass` helpers. Message +properties map to Kafka record headers (byte-encoded string values). + +### Configuration + +New CLI arguments following the existing pattern: + +``` +--pubsub-backend kafka +--kafka-bootstrap-servers localhost:9092 +--kafka-security-protocol PLAINTEXT +--kafka-sasl-mechanism (optional) +--kafka-sasl-username (optional) +--kafka-sasl-password (optional) +``` + +The factory in `pubsub.py` creates a `KafkaBackend` instance when +`pubsub_backend='kafka'`. + +### Dependencies + +`kafka-python-ng` or `confluent-kafka`. The `confluent-kafka` package +provides both producer/consumer and AdminClient in one library with +better performance (C-backed librdkafka), but requires a C extension +build. `kafka-python-ng` is pure Python, simpler to install. + +## Key Design Decisions + +1. **Shared response topic with filtering** over per-instance temporary + topics. Avoids expensive dynamic topic creation for every RPC + exchange. Acceptable because response traffic is low-volume. + +2. **Seek-back for negative acknowledge** over not-committing or retry + topics. Provides immediate redelivery consistent with the RabbitMQ + nack behaviour. + +3. **Single partition per topic** to preserve ordering and simplify + offset management. Parallelism comes from multiple topics and + multiple services, not from partitioning within a topic. + +4. **Retention-based class semantics** instead of durability flags. + Kafka topics are always durable; short retention achieves the + ephemeral behaviour needed for request/response/notify classes. + +## Open Questions + +- **Retention values**: exact `retention.ms` for short-lived topic + classes. 300s (5 minutes) is a starting point; may need tuning based + on worst-case restart/reconnect times. + +- **Library choice**: `confluent-kafka` vs `kafka-python-ng`. Performance + vs install simplicity trade-off. Could support both behind a thin + wrapper. + +- **Consumer poll timeout**: needs to align with the existing + `receive(timeout_millis)` API. Kafka's `poll()` takes a timeout + directly, so this maps cleanly. diff --git a/tests/unit/test_pubsub/test_kafka_backend.py b/tests/unit/test_pubsub/test_kafka_backend.py new file mode 100644 index 00000000..456386f0 --- /dev/null +++ b/tests/unit/test_pubsub/test_kafka_backend.py @@ -0,0 +1,131 @@ +""" +Unit tests for Kafka backend — topic parsing and factory dispatch. +Does not require a running Kafka instance. +""" + +import pytest +import argparse + +from trustgraph.base.kafka_backend import KafkaBackend +from trustgraph.base.pubsub import get_pubsub, add_pubsub_args + + +class TestKafkaParseTopic: + + @pytest.fixture + def backend(self): + b = object.__new__(KafkaBackend) + return b + + def test_flow_is_durable(self, backend): + name, cls, durable = backend._parse_topic('flow:tg:text-completion-request') + assert durable is True + assert cls == 'flow' + assert name == 'tg.flow.text-completion-request' + + def test_notify_is_not_durable(self, backend): + name, cls, durable = backend._parse_topic('notify:tg:config') + assert durable is False + assert cls == 'notify' + assert name == 'tg.notify.config' + + def test_request_is_not_durable(self, backend): + name, cls, durable = backend._parse_topic('request:tg:config') + assert durable is False + assert cls == 'request' + assert name == 'tg.request.config' + + def test_response_is_not_durable(self, backend): + name, cls, durable = backend._parse_topic('response:tg:librarian') + assert durable is False + assert cls == 'response' + assert name == 'tg.response.librarian' + + def test_custom_topicspace(self, backend): + name, cls, durable = backend._parse_topic('flow:prod:my-queue') + assert name == 'prod.flow.my-queue' + assert durable is True + + def test_no_colon_defaults_to_flow(self, backend): + name, cls, durable = backend._parse_topic('simple-queue') + assert name == 'tg.flow.simple-queue' + assert cls == 'flow' + assert durable is True + + def test_invalid_class_raises(self, backend): + with pytest.raises(ValueError, match="Invalid topic class"): + backend._parse_topic('unknown:tg:topic') + + def test_topic_with_flow_suffix(self, backend): + """Topic names with flow suffix (e.g. :default) are preserved.""" + name, cls, durable = backend._parse_topic('request:tg:prompt:default') + assert name == 'tg.request.prompt:default' + + +class TestKafkaRetention: + + @pytest.fixture + def backend(self): + b = object.__new__(KafkaBackend) + return b + + def test_flow_gets_long_retention(self, backend): + assert backend._retention_ms('flow') == 7 * 24 * 60 * 60 * 1000 + + def test_request_gets_short_retention(self, backend): + assert backend._retention_ms('request') == 300 * 1000 + + def test_response_gets_short_retention(self, backend): + assert backend._retention_ms('response') == 300 * 1000 + + def test_notify_gets_short_retention(self, backend): + assert backend._retention_ms('notify') == 300 * 1000 + + +class TestGetPubsubKafka: + + def test_factory_creates_kafka_backend(self): + backend = get_pubsub(pubsub_backend='kafka') + assert isinstance(backend, KafkaBackend) + + def test_factory_passes_config(self): + backend = get_pubsub( + pubsub_backend='kafka', + kafka_bootstrap_servers='myhost:9093', + kafka_security_protocol='SASL_SSL', + kafka_sasl_mechanism='PLAIN', + kafka_sasl_username='user', + kafka_sasl_password='pass', + ) + assert isinstance(backend, KafkaBackend) + assert backend._bootstrap_servers == 'myhost:9093' + assert backend._admin_config['security.protocol'] == 'SASL_SSL' + assert backend._admin_config['sasl.mechanism'] == 'PLAIN' + assert backend._admin_config['sasl.username'] == 'user' + assert backend._admin_config['sasl.password'] == 'pass' + + +class TestAddPubsubArgsKafka: + + def test_kafka_args_present(self): + parser = argparse.ArgumentParser() + add_pubsub_args(parser) + args = parser.parse_args([ + '--pubsub-backend', 'kafka', + '--kafka-bootstrap-servers', 'myhost:9093', + ]) + assert args.pubsub_backend == 'kafka' + assert args.kafka_bootstrap_servers == 'myhost:9093' + + def test_kafka_defaults_container(self): + parser = argparse.ArgumentParser() + add_pubsub_args(parser) + args = parser.parse_args([]) + assert args.kafka_bootstrap_servers == 'kafka:9092' + assert args.kafka_security_protocol == 'PLAINTEXT' + + def test_kafka_standalone_defaults_to_localhost(self): + parser = argparse.ArgumentParser() + add_pubsub_args(parser, standalone=True) + args = parser.parse_args([]) + assert args.kafka_bootstrap_servers == 'localhost:9092' diff --git a/tests/unit/test_pubsub/test_rabbitmq_backend.py b/tests/unit/test_pubsub/test_rabbitmq_backend.py index 54599723..5bafb78f 100644 --- a/tests/unit/test_pubsub/test_rabbitmq_backend.py +++ b/tests/unit/test_pubsub/test_rabbitmq_backend.py @@ -6,8 +6,6 @@ Does not require a running RabbitMQ instance. import pytest import argparse -pika = pytest.importorskip("pika", reason="pika not installed") - from trustgraph.base.rabbitmq_backend import RabbitMQBackend from trustgraph.base.pubsub import get_pubsub, add_pubsub_args diff --git a/trustgraph-base/pyproject.toml b/trustgraph-base/pyproject.toml index 4f1bce76..c4ef3c27 100644 --- a/trustgraph-base/pyproject.toml +++ b/trustgraph-base/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "requests", "python-logging-loki", "pika", + "confluent-kafka", "pyyaml", ] classifiers = [ diff --git a/trustgraph-base/trustgraph/base/kafka_backend.py b/trustgraph-base/trustgraph/base/kafka_backend.py new file mode 100644 index 00000000..8dfe8bfa --- /dev/null +++ b/trustgraph-base/trustgraph/base/kafka_backend.py @@ -0,0 +1,400 @@ +""" +Kafka backend implementation for pub/sub abstraction. + +Each logical topic maps to a Kafka topic. The topic name encodes +the full identity: + + class:topicspace:topic -> topicspace.class.topic + +Producers publish to the topic directly. +Consumers use consumer groups for competing-consumer semantics: + + - flow / request: named consumer group (competing consumers) + - response / notify: unique consumer group per instance, filtering + messages by correlation ID (all subscribers see all messages) + +The flow service manages topic lifecycle via AdminClient. + +Architecture: + Producer --> [Kafka topic] --> Consumer Group A --> Consumer + --> Consumer Group A --> Consumer + --> Consumer Group B --> Consumer (response) +""" + +import asyncio +import json +import logging +import uuid +from typing import Any + +from confluent_kafka import ( + Producer as KafkaProducer, + Consumer as KafkaConsumer, + TopicPartition, + KafkaError, + KafkaException, +) +from confluent_kafka.admin import AdminClient, NewTopic + +from .backend import PubSubBackend, BackendProducer, BackendConsumer, Message +from .serialization import dataclass_to_dict, dict_to_dataclass + +logger = logging.getLogger(__name__) + +# Retention defaults (milliseconds) +LONG_RETENTION_MS = 7 * 24 * 60 * 60 * 1000 # 7 days +SHORT_RETENTION_MS = 300 * 1000 # 5 minutes + + +class KafkaMessage: + """Wrapper for Kafka messages to match Message protocol.""" + + def __init__(self, msg, schema_cls): + self._msg = msg + self._schema_cls = schema_cls + self._value = None + + def value(self) -> Any: + """Deserialize and return the message value as a dataclass.""" + if self._value is None: + data_dict = json.loads(self._msg.value().decode('utf-8')) + self._value = dict_to_dataclass(data_dict, self._schema_cls) + return self._value + + def properties(self) -> dict: + """Return message properties from Kafka headers.""" + headers = self._msg.headers() or [] + return { + k: v.decode('utf-8') if isinstance(v, bytes) else v + for k, v in headers + } + + +class KafkaBackendProducer: + """Publishes messages to a Kafka topic. + + confluent-kafka Producer is thread-safe, so a single instance + can be shared across threads. + """ + + def __init__(self, bootstrap_servers, topic_name, durable): + self._topic_name = topic_name + self._durable = durable + self._producer = KafkaProducer({ + 'bootstrap.servers': bootstrap_servers, + 'acks': 'all' if durable else '1', + }) + + def send(self, message: Any, properties: dict = {}) -> None: + data_dict = dataclass_to_dict(message) + json_data = json.dumps(data_dict).encode('utf-8') + + headers = [ + (k, str(v).encode('utf-8')) + for k, v in properties.items() + ] if properties else None + + self._producer.produce( + topic=self._topic_name, + value=json_data, + headers=headers, + ) + self._producer.flush() + + def flush(self) -> None: + self._producer.flush() + + def close(self) -> None: + self._producer.flush() + + +class KafkaBackendConsumer: + """Consumes from a Kafka topic using a consumer group. + + Uses confluent-kafka Consumer.poll() for message delivery. + Not thread-safe — each instance must be used from a single thread, + which matches the ThreadPoolExecutor pattern in consumer.py. + """ + + def __init__(self, bootstrap_servers, topic_name, group_id, + schema_cls, auto_offset_reset='latest'): + self._bootstrap_servers = bootstrap_servers + self._topic_name = topic_name + self._group_id = group_id + self._schema_cls = schema_cls + self._auto_offset_reset = auto_offset_reset + self._consumer = None + + def _connect(self): + self._consumer = KafkaConsumer({ + 'bootstrap.servers': self._bootstrap_servers, + 'group.id': self._group_id, + 'auto.offset.reset': self._auto_offset_reset, + 'enable.auto.commit': False, + }) + self._consumer.subscribe([self._topic_name]) + logger.info( + f"Kafka consumer connected: topic={self._topic_name}, " + f"group={self._group_id}" + ) + + def _is_alive(self): + return self._consumer is not None + + def ensure_connected(self) -> None: + """Eagerly connect and subscribe. + + For response/notify consumers this must be called before the + corresponding request is published, so that the consumer is + assigned a partition and will see the response message. + """ + if not self._is_alive(): + self._connect() + + # Force a partition assignment by polling briefly. + # Without this, the consumer may not be assigned partitions + # until the first real poll(), creating a race where the + # request is sent before assignment completes. + self._consumer.poll(timeout=1.0) + + def receive(self, timeout_millis: int = 2000) -> Message: + """Receive a message. Raises TimeoutError if none available.""" + if not self._is_alive(): + self._connect() + + timeout_seconds = timeout_millis / 1000.0 + msg = self._consumer.poll(timeout=timeout_seconds) + + if msg is None: + raise TimeoutError("No message received within timeout") + + if msg.error(): + error = msg.error() + if error.code() == KafkaError._PARTITION_EOF: + raise TimeoutError("End of partition reached") + raise KafkaException(error) + + return KafkaMessage(msg, self._schema_cls) + + def acknowledge(self, message: Message) -> None: + """Commit the message's offset (next offset to read).""" + if isinstance(message, KafkaMessage) and message._msg: + tp = TopicPartition( + message._msg.topic(), + message._msg.partition(), + message._msg.offset() + 1, + ) + self._consumer.commit(offsets=[tp], asynchronous=False) + + def negative_acknowledge(self, message: Message) -> None: + """Seek back to the message's offset for redelivery.""" + if isinstance(message, KafkaMessage) and message._msg: + tp = TopicPartition( + message._msg.topic(), + message._msg.partition(), + message._msg.offset(), + ) + self._consumer.seek(tp) + + def unsubscribe(self) -> None: + if self._consumer: + try: + self._consumer.unsubscribe() + except Exception: + pass + + def close(self) -> None: + if self._consumer: + try: + self._consumer.close() + except Exception: + pass + self._consumer = None + + +class KafkaBackend: + """Kafka pub/sub backend using one topic per logical topic.""" + + def __init__(self, bootstrap_servers='localhost:9092', + security_protocol='PLAINTEXT', + sasl_mechanism=None, + sasl_username=None, + sasl_password=None): + self._bootstrap_servers = bootstrap_servers + + # AdminClient config + self._admin_config = { + 'bootstrap.servers': bootstrap_servers, + } + + if security_protocol != 'PLAINTEXT': + self._admin_config['security.protocol'] = security_protocol + if sasl_mechanism: + self._admin_config['sasl.mechanism'] = sasl_mechanism + if sasl_username: + self._admin_config['sasl.username'] = sasl_username + if sasl_password: + self._admin_config['sasl.password'] = sasl_password + + logger.info( + f"Kafka backend: {bootstrap_servers} " + f"protocol={security_protocol}" + ) + + def _parse_topic(self, topic_id: str) -> tuple[str, str, bool]: + """ + Parse topic identifier into Kafka topic name, class, and durability. + + Format: class:topicspace:topic + Returns: (topic_name, class, durable) + """ + if ':' not in topic_id: + return f'tg.flow.{topic_id}', 'flow', True + + parts = topic_id.split(':', 2) + if len(parts) != 3: + raise ValueError( + f"Invalid topic format: {topic_id}, " + f"expected class:topicspace:topic" + ) + + cls, topicspace, topic = parts + + if cls == 'flow': + durable = True + elif cls in ('request', 'response', 'notify'): + durable = False + else: + raise ValueError( + f"Invalid topic class: {cls}, " + f"expected flow, request, response, or notify" + ) + + topic_name = f"{topicspace}.{cls}.{topic}" + + return topic_name, cls, durable + + def _retention_ms(self, cls): + """Return retention.ms for a topic class.""" + if cls == 'flow': + return LONG_RETENTION_MS + return SHORT_RETENTION_MS + + def create_producer(self, topic: str, schema: type, + **options) -> BackendProducer: + topic_name, cls, durable = self._parse_topic(topic) + logger.debug(f"Creating producer: topic={topic_name}") + return KafkaBackendProducer( + self._bootstrap_servers, topic_name, durable, + ) + + def create_consumer(self, topic: str, subscription: str, schema: type, + initial_position: str = 'latest', + **options) -> BackendConsumer: + """Create a consumer subscribed to a Kafka topic. + + Behaviour is determined by the topic's class prefix: + - flow: named consumer group, competing consumers + - request: named consumer group, competing consumers + - response: unique consumer group per instance + - notify: unique consumer group per instance + """ + topic_name, cls, durable = self._parse_topic(topic) + + if cls in ('response', 'notify'): + # Per-subscriber: unique group so every instance sees + # every message. Filter by correlation ID happens at + # the Subscriber layer above. + group_id = f"{subscription}-{uuid.uuid4()}" + auto_offset_reset = 'latest' + else: + # Shared: named group, competing consumers + group_id = subscription + auto_offset_reset = ( + 'earliest' if initial_position == 'earliest' + else 'latest' + ) + + logger.debug( + f"Creating consumer: topic={topic_name}, " + f"group={group_id}, cls={cls}" + ) + + return KafkaBackendConsumer( + self._bootstrap_servers, topic_name, group_id, + schema, auto_offset_reset, + ) + + def _create_topic_sync(self, topic_name, retention_ms): + """Blocking topic creation via AdminClient.""" + admin = AdminClient(self._admin_config) + new_topic = NewTopic( + topic_name, + num_partitions=1, + replication_factor=1, + config={ + 'retention.ms': str(retention_ms), + }, + ) + fs = admin.create_topics([new_topic]) + for name, f in fs.items(): + try: + f.result() + logger.info(f"Created topic: {name}") + except KafkaException as e: + # Topic already exists — idempotent + if e.args[0].code() == KafkaError.TOPIC_ALREADY_EXISTS: + logger.debug(f"Topic already exists: {name}") + else: + raise + + async def create_topic(self, topic: str) -> None: + """Create a Kafka topic with appropriate retention.""" + topic_name, cls, durable = self._parse_topic(topic) + retention_ms = self._retention_ms(cls) + await asyncio.to_thread( + self._create_topic_sync, topic_name, retention_ms, + ) + + def _delete_topic_sync(self, topic_name): + """Blocking topic deletion via AdminClient.""" + admin = AdminClient(self._admin_config) + fs = admin.delete_topics([topic_name]) + for name, f in fs.items(): + try: + f.result() + logger.info(f"Deleted topic: {name}") + except KafkaException as e: + # Topic doesn't exist — idempotent + if e.args[0].code() == KafkaError.UNKNOWN_TOPIC_OR_PART: + logger.debug(f"Topic not found: {name}") + else: + raise + except Exception as e: + logger.debug(f"Topic delete for {name}: {e}") + + async def delete_topic(self, topic: str) -> None: + """Delete a Kafka topic.""" + topic_name, cls, durable = self._parse_topic(topic) + await asyncio.to_thread(self._delete_topic_sync, topic_name) + + def _topic_exists_sync(self, topic_name): + """Blocking topic existence check via AdminClient.""" + admin = AdminClient(self._admin_config) + metadata = admin.list_topics(timeout=10) + return topic_name in metadata.topics + + async def topic_exists(self, topic: str) -> bool: + """Check whether a Kafka topic exists.""" + topic_name, cls, durable = self._parse_topic(topic) + return await asyncio.to_thread( + self._topic_exists_sync, topic_name, + ) + + async def ensure_topic(self, topic: str) -> None: + """Ensure a topic exists, creating it if necessary.""" + if not await self.topic_exists(topic): + await self.create_topic(topic) + + def close(self) -> None: + pass diff --git a/trustgraph-base/trustgraph/base/pubsub.py b/trustgraph-base/trustgraph/base/pubsub.py index a7ae3719..fb4765c1 100644 --- a/trustgraph-base/trustgraph/base/pubsub.py +++ b/trustgraph-base/trustgraph/base/pubsub.py @@ -17,6 +17,12 @@ DEFAULT_RABBITMQ_USERNAME = os.getenv("RABBITMQ_USERNAME", 'guest') DEFAULT_RABBITMQ_PASSWORD = os.getenv("RABBITMQ_PASSWORD", 'guest') DEFAULT_RABBITMQ_VHOST = os.getenv("RABBITMQ_VHOST", '/') +DEFAULT_KAFKA_BOOTSTRAP = os.getenv("KAFKA_BOOTSTRAP_SERVERS", 'kafka:9092') +DEFAULT_KAFKA_PROTOCOL = os.getenv("KAFKA_SECURITY_PROTOCOL", 'PLAINTEXT') +DEFAULT_KAFKA_SASL_MECHANISM = os.getenv("KAFKA_SASL_MECHANISM", None) +DEFAULT_KAFKA_SASL_USERNAME = os.getenv("KAFKA_SASL_USERNAME", None) +DEFAULT_KAFKA_SASL_PASSWORD = os.getenv("KAFKA_SASL_PASSWORD", None) + def get_pubsub(**config: Any) -> Any: """ @@ -47,6 +53,25 @@ def get_pubsub(**config: Any) -> Any: password=config.get('rabbitmq_password', DEFAULT_RABBITMQ_PASSWORD), vhost=config.get('rabbitmq_vhost', DEFAULT_RABBITMQ_VHOST), ) + elif backend_type == 'kafka': + from .kafka_backend import KafkaBackend + return KafkaBackend( + bootstrap_servers=config.get( + 'kafka_bootstrap_servers', DEFAULT_KAFKA_BOOTSTRAP, + ), + security_protocol=config.get( + 'kafka_security_protocol', DEFAULT_KAFKA_PROTOCOL, + ), + sasl_mechanism=config.get( + 'kafka_sasl_mechanism', DEFAULT_KAFKA_SASL_MECHANISM, + ), + sasl_username=config.get( + 'kafka_sasl_username', DEFAULT_KAFKA_SASL_USERNAME, + ), + sasl_password=config.get( + 'kafka_sasl_password', DEFAULT_KAFKA_SASL_PASSWORD, + ), + ) else: raise ValueError(f"Unknown pub/sub backend: {backend_type}") @@ -65,6 +90,7 @@ def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None: pulsar_host = STANDALONE_PULSAR_HOST if standalone else DEFAULT_PULSAR_HOST pulsar_listener = 'localhost' if standalone else None rabbitmq_host = 'localhost' if standalone else DEFAULT_RABBITMQ_HOST + kafka_bootstrap = 'localhost:9092' if standalone else DEFAULT_KAFKA_BOOTSTRAP parser.add_argument( '--pubsub-backend', @@ -122,3 +148,34 @@ def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None: default=DEFAULT_RABBITMQ_VHOST, help=f'RabbitMQ vhost (default: {DEFAULT_RABBITMQ_VHOST})', ) + + # Kafka options + parser.add_argument( + '--kafka-bootstrap-servers', + default=kafka_bootstrap, + help=f'Kafka bootstrap servers (default: {kafka_bootstrap})', + ) + + parser.add_argument( + '--kafka-security-protocol', + default=DEFAULT_KAFKA_PROTOCOL, + help=f'Kafka security protocol (default: {DEFAULT_KAFKA_PROTOCOL})', + ) + + parser.add_argument( + '--kafka-sasl-mechanism', + default=DEFAULT_KAFKA_SASL_MECHANISM, + help='Kafka SASL mechanism', + ) + + parser.add_argument( + '--kafka-sasl-username', + default=DEFAULT_KAFKA_SASL_USERNAME, + help='Kafka SASL username', + ) + + parser.add_argument( + '--kafka-sasl-password', + default=DEFAULT_KAFKA_SASL_PASSWORD, + help='Kafka SASL password', + )