diff --git a/docs/tech-specs/kafka-backend.md b/docs/tech-specs/kafka-backend.md new file mode 100644 index 00000000..178ddfe4 --- /dev/null +++ b/docs/tech-specs/kafka-backend.md @@ -0,0 +1,200 @@ +--- +layout: default +title: "Kafka Pub/Sub Backend Technical Specification" +parent: "Tech Specs" +--- + +# Kafka Pub/Sub Backend Technical Specification + +## Overview + +Add Apache Kafka as a third pub/sub backend alongside Pulsar and RabbitMQ. +Kafka's topic model maps naturally to TrustGraph's pub/sub abstraction: +topics are first-class, consumer groups provide competing-consumer +semantics, and the AdminClient handles topic lifecycle. + +## Problem + +TrustGraph currently supports Pulsar and RabbitMQ. Kafka is widely +deployed and operationally familiar to many teams. Its log-based +architecture provides durable, replayable message streams with +well-understood scaling properties. + +## Design + +### Concept Mapping + +| TrustGraph concept | Kafka equivalent | +|---|---| +| Topic (`class:topicspace:topic`) | Kafka topic (named `topicspace.class.topic`) | +| Subscription (competing consumers) | Consumer group | +| `create_topic` / `delete_topic` | `AdminClient.create_topics()` / `delete_topics()` | +| `ensure_topic` | `AdminClient.create_topics()` (idempotent) | +| Producer | `KafkaProducer` | +| Consumer | `KafkaConsumer` in a consumer group | +| Message acknowledge | Commit offset | +| Message negative acknowledge | Seek back to message offset | + +### Topic Naming + +The topic name follows the same convention as the RabbitMQ exchange +name: + +``` +class:topicspace:topic -> topicspace.class.topic +``` + +Examples: +- `flow:tg:text-completion-request` -> `tg.flow.text-completion-request` +- `request:tg:librarian` -> `tg.request.librarian` +- `response:tg:config` -> `tg.response.config` + +### Topic Classes and Retention + +Kafka topics are always durable (log-based). The class prefix determines +retention policy rather than durability: + +| Class | Retention | Partitions | Notes | +|---|---|---|---| +| `flow` | Long or infinite | 1 | Data pipeline, order preserved | +| `request` | Short (e.g. 300s) | 1 | RPC requests, ephemeral | +| `response` | Short (e.g. 300s) | 1 | RPC responses, shared (see below) | +| `notify` | Short (e.g. 300s) | 1 | Broadcast signals | + +Single partition per topic preserves message ordering and makes +offset-based acknowledgment equivalent to per-message ack. This matches +the current `prefetch_count=1` model used across all backends. + +### Producers + +Straightforward `KafkaProducer` wrapping. Messages are serialised as +JSON (consistent with the RabbitMQ backend). Message properties/headers +map to Kafka record headers. + +### Consumers + +#### Flow and Request Class (Competing Consumers) + +Consumer group ID = subscription name. Multiple consumers in the same +group share the workload (Kafka's native consumer group rebalancing). + +``` +group_id = subscription # e.g. "triples-store--default--input" +``` + +#### Response and Notify Class (Per-Subscriber) + +This is where Kafka differs from RabbitMQ. Kafka has no anonymous +exclusive auto-delete queues. + +Design: use a **shared response topic with unique consumer groups**. +Each subscriber gets its own consumer group (using the existing +UUID-based subscription name from `RequestResponseSpec`). Every +subscriber reads all messages from the topic and filters by correlation +ID, discarding non-matching messages. + +This is slightly wasteful — N subscribers each read every response — but +request/response traffic is low-volume compared to the data pipeline. +The alternative (per-instance temporary topics) would require dynamic +topic creation/deletion for every API gateway request, which is +expensive in Kafka (AdminClient operations involve controller +coordination). + +### Acknowledgment + +#### Acknowledge (Success) + +Commit the message's offset. With a single partition and sequential +processing, this is equivalent to per-message ack: + +``` +consumer.commit(offsets={partition: offset + 1}) +``` + +#### Negative Acknowledge (Failure / Retry) + +Kafka has no native nack-with-redelivery. On processing failure, seek +the consumer back to the failed message's offset: + +``` +consumer.seek(partition, offset) +``` + +The message is redelivered on the next poll. This matches the current +RabbitMQ `basic_nack(requeue=True)` behaviour: the message is retried +by the same consumer. + +### Topic Lifecycle + +The flow service creates and deletes topics via the Kafka AdminClient: + +- **Flow start**: `AdminClient.create_topics()` for each unique topic + in the blueprint. Topic config includes `retention.ms` based on class. +- **Flow stop**: `AdminClient.delete_topics()` for the flow's topics. +- **Service startup**: `ensure_topic` creates the topic if it doesn't + exist (idempotent via `create_topics` with `validate_only=False`). + +Unlike RabbitMQ where consumers declare their own queues, Kafka topics +must exist before consumers connect. The flow service and service +startup `ensure_topic` calls handle this. + +### Message Encoding + +JSON body, consistent with the RabbitMQ backend. Serialisation uses the +existing `dataclass_to_dict` / `dict_to_dataclass` helpers. Message +properties map to Kafka record headers (byte-encoded string values). + +### Configuration + +New CLI arguments following the existing pattern: + +``` +--pubsub-backend kafka +--kafka-bootstrap-servers localhost:9092 +--kafka-security-protocol PLAINTEXT +--kafka-sasl-mechanism (optional) +--kafka-sasl-username (optional) +--kafka-sasl-password (optional) +``` + +The factory in `pubsub.py` creates a `KafkaBackend` instance when +`pubsub_backend='kafka'`. + +### Dependencies + +`kafka-python-ng` or `confluent-kafka`. The `confluent-kafka` package +provides both producer/consumer and AdminClient in one library with +better performance (C-backed librdkafka), but requires a C extension +build. `kafka-python-ng` is pure Python, simpler to install. + +## Key Design Decisions + +1. **Shared response topic with filtering** over per-instance temporary + topics. Avoids expensive dynamic topic creation for every RPC + exchange. Acceptable because response traffic is low-volume. + +2. **Seek-back for negative acknowledge** over not-committing or retry + topics. Provides immediate redelivery consistent with the RabbitMQ + nack behaviour. + +3. **Single partition per topic** to preserve ordering and simplify + offset management. Parallelism comes from multiple topics and + multiple services, not from partitioning within a topic. + +4. **Retention-based class semantics** instead of durability flags. + Kafka topics are always durable; short retention achieves the + ephemeral behaviour needed for request/response/notify classes. + +## Open Questions + +- **Retention values**: exact `retention.ms` for short-lived topic + classes. 300s (5 minutes) is a starting point; may need tuning based + on worst-case restart/reconnect times. + +- **Library choice**: `confluent-kafka` vs `kafka-python-ng`. Performance + vs install simplicity trade-off. Could support both behind a thin + wrapper. + +- **Consumer poll timeout**: needs to align with the existing + `receive(timeout_millis)` API. Kafka's `poll()` takes a timeout + directly, so this maps cleanly. diff --git a/tests/unit/test_pubsub/test_kafka_backend.py b/tests/unit/test_pubsub/test_kafka_backend.py new file mode 100644 index 00000000..456386f0 --- /dev/null +++ b/tests/unit/test_pubsub/test_kafka_backend.py @@ -0,0 +1,131 @@ +""" +Unit tests for Kafka backend — topic parsing and factory dispatch. +Does not require a running Kafka instance. +""" + +import pytest +import argparse + +from trustgraph.base.kafka_backend import KafkaBackend +from trustgraph.base.pubsub import get_pubsub, add_pubsub_args + + +class TestKafkaParseTopic: + + @pytest.fixture + def backend(self): + b = object.__new__(KafkaBackend) + return b + + def test_flow_is_durable(self, backend): + name, cls, durable = backend._parse_topic('flow:tg:text-completion-request') + assert durable is True + assert cls == 'flow' + assert name == 'tg.flow.text-completion-request' + + def test_notify_is_not_durable(self, backend): + name, cls, durable = backend._parse_topic('notify:tg:config') + assert durable is False + assert cls == 'notify' + assert name == 'tg.notify.config' + + def test_request_is_not_durable(self, backend): + name, cls, durable = backend._parse_topic('request:tg:config') + assert durable is False + assert cls == 'request' + assert name == 'tg.request.config' + + def test_response_is_not_durable(self, backend): + name, cls, durable = backend._parse_topic('response:tg:librarian') + assert durable is False + assert cls == 'response' + assert name == 'tg.response.librarian' + + def test_custom_topicspace(self, backend): + name, cls, durable = backend._parse_topic('flow:prod:my-queue') + assert name == 'prod.flow.my-queue' + assert durable is True + + def test_no_colon_defaults_to_flow(self, backend): + name, cls, durable = backend._parse_topic('simple-queue') + assert name == 'tg.flow.simple-queue' + assert cls == 'flow' + assert durable is True + + def test_invalid_class_raises(self, backend): + with pytest.raises(ValueError, match="Invalid topic class"): + backend._parse_topic('unknown:tg:topic') + + def test_topic_with_flow_suffix(self, backend): + """Topic names with flow suffix (e.g. :default) are preserved.""" + name, cls, durable = backend._parse_topic('request:tg:prompt:default') + assert name == 'tg.request.prompt:default' + + +class TestKafkaRetention: + + @pytest.fixture + def backend(self): + b = object.__new__(KafkaBackend) + return b + + def test_flow_gets_long_retention(self, backend): + assert backend._retention_ms('flow') == 7 * 24 * 60 * 60 * 1000 + + def test_request_gets_short_retention(self, backend): + assert backend._retention_ms('request') == 300 * 1000 + + def test_response_gets_short_retention(self, backend): + assert backend._retention_ms('response') == 300 * 1000 + + def test_notify_gets_short_retention(self, backend): + assert backend._retention_ms('notify') == 300 * 1000 + + +class TestGetPubsubKafka: + + def test_factory_creates_kafka_backend(self): + backend = get_pubsub(pubsub_backend='kafka') + assert isinstance(backend, KafkaBackend) + + def test_factory_passes_config(self): + backend = get_pubsub( + pubsub_backend='kafka', + kafka_bootstrap_servers='myhost:9093', + kafka_security_protocol='SASL_SSL', + kafka_sasl_mechanism='PLAIN', + kafka_sasl_username='user', + kafka_sasl_password='pass', + ) + assert isinstance(backend, KafkaBackend) + assert backend._bootstrap_servers == 'myhost:9093' + assert backend._admin_config['security.protocol'] == 'SASL_SSL' + assert backend._admin_config['sasl.mechanism'] == 'PLAIN' + assert backend._admin_config['sasl.username'] == 'user' + assert backend._admin_config['sasl.password'] == 'pass' + + +class TestAddPubsubArgsKafka: + + def test_kafka_args_present(self): + parser = argparse.ArgumentParser() + add_pubsub_args(parser) + args = parser.parse_args([ + '--pubsub-backend', 'kafka', + '--kafka-bootstrap-servers', 'myhost:9093', + ]) + assert args.pubsub_backend == 'kafka' + assert args.kafka_bootstrap_servers == 'myhost:9093' + + def test_kafka_defaults_container(self): + parser = argparse.ArgumentParser() + add_pubsub_args(parser) + args = parser.parse_args([]) + assert args.kafka_bootstrap_servers == 'kafka:9092' + assert args.kafka_security_protocol == 'PLAINTEXT' + + def test_kafka_standalone_defaults_to_localhost(self): + parser = argparse.ArgumentParser() + add_pubsub_args(parser, standalone=True) + args = parser.parse_args([]) + assert args.kafka_bootstrap_servers == 'localhost:9092' diff --git a/tests/unit/test_pubsub/test_rabbitmq_backend.py b/tests/unit/test_pubsub/test_rabbitmq_backend.py index ffe18fd7..5bafb78f 100644 --- a/tests/unit/test_pubsub/test_rabbitmq_backend.py +++ b/tests/unit/test_pubsub/test_rabbitmq_backend.py @@ -1,18 +1,16 @@ """ -Unit tests for RabbitMQ backend — queue name mapping and factory dispatch. +Unit tests for RabbitMQ backend — topic parsing and factory dispatch. Does not require a running RabbitMQ instance. """ import pytest import argparse -pika = pytest.importorskip("pika", reason="pika not installed") - from trustgraph.base.rabbitmq_backend import RabbitMQBackend from trustgraph.base.pubsub import get_pubsub, add_pubsub_args -class TestRabbitMQMapQueueName: +class TestRabbitMQParseTopic: @pytest.fixture def backend(self): @@ -20,43 +18,48 @@ class TestRabbitMQMapQueueName: return b def test_flow_is_durable(self, backend): - name, durable = backend.map_queue_name('flow:tg:text-completion-request') + exchange, cls, durable = backend._parse_topic('flow:tg:text-completion-request') assert durable is True - assert name == 'tg.flow.text-completion-request' + assert cls == 'flow' + assert exchange == 'tg.flow.text-completion-request' def test_notify_is_not_durable(self, backend): - name, durable = backend.map_queue_name('notify:tg:config') + exchange, cls, durable = backend._parse_topic('notify:tg:config') assert durable is False - assert name == 'tg.notify.config' + assert cls == 'notify' + assert exchange == 'tg.notify.config' def test_request_is_not_durable(self, backend): - name, durable = backend.map_queue_name('request:tg:config') + exchange, cls, durable = backend._parse_topic('request:tg:config') assert durable is False - assert name == 'tg.request.config' + assert cls == 'request' + assert exchange == 'tg.request.config' def test_response_is_not_durable(self, backend): - name, durable = backend.map_queue_name('response:tg:librarian') + exchange, cls, durable = backend._parse_topic('response:tg:librarian') assert durable is False - assert name == 'tg.response.librarian' + assert cls == 'response' + assert exchange == 'tg.response.librarian' def test_custom_topicspace(self, backend): - name, durable = backend.map_queue_name('flow:prod:my-queue') - assert name == 'prod.flow.my-queue' + exchange, cls, durable = backend._parse_topic('flow:prod:my-queue') + assert exchange == 'prod.flow.my-queue' assert durable is True def test_no_colon_defaults_to_flow(self, backend): - name, durable = backend.map_queue_name('simple-queue') - assert name == 'tg.simple-queue' - assert durable is False + exchange, cls, durable = backend._parse_topic('simple-queue') + assert exchange == 'tg.flow.simple-queue' + assert cls == 'flow' + assert durable is True def test_invalid_class_raises(self, backend): - with pytest.raises(ValueError, match="Invalid queue class"): - backend.map_queue_name('unknown:tg:topic') + with pytest.raises(ValueError, match="Invalid topic class"): + backend._parse_topic('unknown:tg:topic') - def test_flow_with_flow_suffix(self, backend): - """Queue names with flow suffix (e.g. :default) are preserved.""" - name, durable = backend.map_queue_name('request:tg:prompt:default') - assert name == 'tg.request.prompt:default' + def test_topic_with_flow_suffix(self, backend): + """Topic names with flow suffix (e.g. :default) are preserved.""" + exchange, cls, durable = backend._parse_topic('request:tg:prompt:default') + assert exchange == 'tg.request.prompt:default' class TestGetPubsubRabbitMQ: diff --git a/trustgraph-base/pyproject.toml b/trustgraph-base/pyproject.toml index 4f1bce76..c4ef3c27 100644 --- a/trustgraph-base/pyproject.toml +++ b/trustgraph-base/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "requests", "python-logging-loki", "pika", + "confluent-kafka", "pyyaml", ] classifiers = [ diff --git a/trustgraph-base/trustgraph/base/backend.py b/trustgraph-base/trustgraph/base/backend.py index 0f95ca1b..a105ca17 100644 --- a/trustgraph-base/trustgraph/base/backend.py +++ b/trustgraph-base/trustgraph/base/backend.py @@ -121,7 +121,7 @@ class PubSubBackend(Protocol): Create a producer for a topic. Args: - topic: Generic topic format (qos/tenant/namespace/queue) + topic: Queue identifier in class:topicspace:topic format schema: Dataclass type for messages **options: Backend-specific options (e.g., chunking_enabled) @@ -159,59 +159,55 @@ class PubSubBackend(Protocol): """ ... - async def create_queue(self, topic: str, subscription: str) -> None: + async def create_topic(self, topic: str) -> None: """ - Pre-create a queue so it exists before any consumer connects. + Create the broker-side resources for a logical topic. - The topic and subscription together identify the queue, mirroring - create_consumer where the queue name is derived from both. + For RabbitMQ this creates a fanout exchange. For Pulsar this is + a no-op (topics auto-create on first use). - Idempotent — creating an already-existing queue succeeds silently. + Idempotent — creating an already-existing topic succeeds silently. Args: - topic: Queue identifier in class:topicspace:topic format - subscription: Subscription/consumer group name + topic: Topic identifier in class:topicspace:topic format """ ... - async def delete_queue(self, topic: str, subscription: str) -> None: + async def delete_topic(self, topic: str) -> None: """ - Delete a queue and any messages it contains. + Delete a topic and discard any in-flight messages. - The topic and subscription together identify the queue, mirroring - create_consumer where the queue name is derived from both. + For RabbitMQ this deletes the fanout exchange; consumer queues + lose their binding and drain naturally. - Idempotent — deleting a non-existent queue succeeds silently. + Idempotent — deleting a non-existent topic succeeds silently. Args: - topic: Queue identifier in class:topicspace:topic format - subscription: Subscription/consumer group name + topic: Topic identifier in class:topicspace:topic format """ ... - async def queue_exists(self, topic: str, subscription: str) -> bool: + async def topic_exists(self, topic: str) -> bool: """ - Check whether a queue exists. + Check whether a topic exists. Args: - topic: Queue identifier in class:topicspace:topic format - subscription: Subscription/consumer group name + topic: Topic identifier in class:topicspace:topic format Returns: - True if the queue exists, False otherwise. + True if the topic exists, False otherwise. """ ... - async def ensure_queue(self, topic: str, subscription: str) -> None: + async def ensure_topic(self, topic: str) -> None: """ - Ensure a queue exists, creating it if necessary. + Ensure a topic exists, creating it if necessary. Convenience wrapper — checks existence, creates if missing. - Used by system services on startup. + Used by the flow service and system services on startup. Args: - topic: Queue identifier in class:topicspace:topic format - subscription: Subscription/consumer group name + topic: Topic identifier in class:topicspace:topic format """ ... diff --git a/trustgraph-base/trustgraph/base/kafka_backend.py b/trustgraph-base/trustgraph/base/kafka_backend.py new file mode 100644 index 00000000..8dfe8bfa --- /dev/null +++ b/trustgraph-base/trustgraph/base/kafka_backend.py @@ -0,0 +1,400 @@ +""" +Kafka backend implementation for pub/sub abstraction. + +Each logical topic maps to a Kafka topic. The topic name encodes +the full identity: + + class:topicspace:topic -> topicspace.class.topic + +Producers publish to the topic directly. +Consumers use consumer groups for competing-consumer semantics: + + - flow / request: named consumer group (competing consumers) + - response / notify: unique consumer group per instance, filtering + messages by correlation ID (all subscribers see all messages) + +The flow service manages topic lifecycle via AdminClient. + +Architecture: + Producer --> [Kafka topic] --> Consumer Group A --> Consumer + --> Consumer Group A --> Consumer + --> Consumer Group B --> Consumer (response) +""" + +import asyncio +import json +import logging +import uuid +from typing import Any + +from confluent_kafka import ( + Producer as KafkaProducer, + Consumer as KafkaConsumer, + TopicPartition, + KafkaError, + KafkaException, +) +from confluent_kafka.admin import AdminClient, NewTopic + +from .backend import PubSubBackend, BackendProducer, BackendConsumer, Message +from .serialization import dataclass_to_dict, dict_to_dataclass + +logger = logging.getLogger(__name__) + +# Retention defaults (milliseconds) +LONG_RETENTION_MS = 7 * 24 * 60 * 60 * 1000 # 7 days +SHORT_RETENTION_MS = 300 * 1000 # 5 minutes + + +class KafkaMessage: + """Wrapper for Kafka messages to match Message protocol.""" + + def __init__(self, msg, schema_cls): + self._msg = msg + self._schema_cls = schema_cls + self._value = None + + def value(self) -> Any: + """Deserialize and return the message value as a dataclass.""" + if self._value is None: + data_dict = json.loads(self._msg.value().decode('utf-8')) + self._value = dict_to_dataclass(data_dict, self._schema_cls) + return self._value + + def properties(self) -> dict: + """Return message properties from Kafka headers.""" + headers = self._msg.headers() or [] + return { + k: v.decode('utf-8') if isinstance(v, bytes) else v + for k, v in headers + } + + +class KafkaBackendProducer: + """Publishes messages to a Kafka topic. + + confluent-kafka Producer is thread-safe, so a single instance + can be shared across threads. + """ + + def __init__(self, bootstrap_servers, topic_name, durable): + self._topic_name = topic_name + self._durable = durable + self._producer = KafkaProducer({ + 'bootstrap.servers': bootstrap_servers, + 'acks': 'all' if durable else '1', + }) + + def send(self, message: Any, properties: dict = {}) -> None: + data_dict = dataclass_to_dict(message) + json_data = json.dumps(data_dict).encode('utf-8') + + headers = [ + (k, str(v).encode('utf-8')) + for k, v in properties.items() + ] if properties else None + + self._producer.produce( + topic=self._topic_name, + value=json_data, + headers=headers, + ) + self._producer.flush() + + def flush(self) -> None: + self._producer.flush() + + def close(self) -> None: + self._producer.flush() + + +class KafkaBackendConsumer: + """Consumes from a Kafka topic using a consumer group. + + Uses confluent-kafka Consumer.poll() for message delivery. + Not thread-safe — each instance must be used from a single thread, + which matches the ThreadPoolExecutor pattern in consumer.py. + """ + + def __init__(self, bootstrap_servers, topic_name, group_id, + schema_cls, auto_offset_reset='latest'): + self._bootstrap_servers = bootstrap_servers + self._topic_name = topic_name + self._group_id = group_id + self._schema_cls = schema_cls + self._auto_offset_reset = auto_offset_reset + self._consumer = None + + def _connect(self): + self._consumer = KafkaConsumer({ + 'bootstrap.servers': self._bootstrap_servers, + 'group.id': self._group_id, + 'auto.offset.reset': self._auto_offset_reset, + 'enable.auto.commit': False, + }) + self._consumer.subscribe([self._topic_name]) + logger.info( + f"Kafka consumer connected: topic={self._topic_name}, " + f"group={self._group_id}" + ) + + def _is_alive(self): + return self._consumer is not None + + def ensure_connected(self) -> None: + """Eagerly connect and subscribe. + + For response/notify consumers this must be called before the + corresponding request is published, so that the consumer is + assigned a partition and will see the response message. + """ + if not self._is_alive(): + self._connect() + + # Force a partition assignment by polling briefly. + # Without this, the consumer may not be assigned partitions + # until the first real poll(), creating a race where the + # request is sent before assignment completes. + self._consumer.poll(timeout=1.0) + + def receive(self, timeout_millis: int = 2000) -> Message: + """Receive a message. Raises TimeoutError if none available.""" + if not self._is_alive(): + self._connect() + + timeout_seconds = timeout_millis / 1000.0 + msg = self._consumer.poll(timeout=timeout_seconds) + + if msg is None: + raise TimeoutError("No message received within timeout") + + if msg.error(): + error = msg.error() + if error.code() == KafkaError._PARTITION_EOF: + raise TimeoutError("End of partition reached") + raise KafkaException(error) + + return KafkaMessage(msg, self._schema_cls) + + def acknowledge(self, message: Message) -> None: + """Commit the message's offset (next offset to read).""" + if isinstance(message, KafkaMessage) and message._msg: + tp = TopicPartition( + message._msg.topic(), + message._msg.partition(), + message._msg.offset() + 1, + ) + self._consumer.commit(offsets=[tp], asynchronous=False) + + def negative_acknowledge(self, message: Message) -> None: + """Seek back to the message's offset for redelivery.""" + if isinstance(message, KafkaMessage) and message._msg: + tp = TopicPartition( + message._msg.topic(), + message._msg.partition(), + message._msg.offset(), + ) + self._consumer.seek(tp) + + def unsubscribe(self) -> None: + if self._consumer: + try: + self._consumer.unsubscribe() + except Exception: + pass + + def close(self) -> None: + if self._consumer: + try: + self._consumer.close() + except Exception: + pass + self._consumer = None + + +class KafkaBackend: + """Kafka pub/sub backend using one topic per logical topic.""" + + def __init__(self, bootstrap_servers='localhost:9092', + security_protocol='PLAINTEXT', + sasl_mechanism=None, + sasl_username=None, + sasl_password=None): + self._bootstrap_servers = bootstrap_servers + + # AdminClient config + self._admin_config = { + 'bootstrap.servers': bootstrap_servers, + } + + if security_protocol != 'PLAINTEXT': + self._admin_config['security.protocol'] = security_protocol + if sasl_mechanism: + self._admin_config['sasl.mechanism'] = sasl_mechanism + if sasl_username: + self._admin_config['sasl.username'] = sasl_username + if sasl_password: + self._admin_config['sasl.password'] = sasl_password + + logger.info( + f"Kafka backend: {bootstrap_servers} " + f"protocol={security_protocol}" + ) + + def _parse_topic(self, topic_id: str) -> tuple[str, str, bool]: + """ + Parse topic identifier into Kafka topic name, class, and durability. + + Format: class:topicspace:topic + Returns: (topic_name, class, durable) + """ + if ':' not in topic_id: + return f'tg.flow.{topic_id}', 'flow', True + + parts = topic_id.split(':', 2) + if len(parts) != 3: + raise ValueError( + f"Invalid topic format: {topic_id}, " + f"expected class:topicspace:topic" + ) + + cls, topicspace, topic = parts + + if cls == 'flow': + durable = True + elif cls in ('request', 'response', 'notify'): + durable = False + else: + raise ValueError( + f"Invalid topic class: {cls}, " + f"expected flow, request, response, or notify" + ) + + topic_name = f"{topicspace}.{cls}.{topic}" + + return topic_name, cls, durable + + def _retention_ms(self, cls): + """Return retention.ms for a topic class.""" + if cls == 'flow': + return LONG_RETENTION_MS + return SHORT_RETENTION_MS + + def create_producer(self, topic: str, schema: type, + **options) -> BackendProducer: + topic_name, cls, durable = self._parse_topic(topic) + logger.debug(f"Creating producer: topic={topic_name}") + return KafkaBackendProducer( + self._bootstrap_servers, topic_name, durable, + ) + + def create_consumer(self, topic: str, subscription: str, schema: type, + initial_position: str = 'latest', + **options) -> BackendConsumer: + """Create a consumer subscribed to a Kafka topic. + + Behaviour is determined by the topic's class prefix: + - flow: named consumer group, competing consumers + - request: named consumer group, competing consumers + - response: unique consumer group per instance + - notify: unique consumer group per instance + """ + topic_name, cls, durable = self._parse_topic(topic) + + if cls in ('response', 'notify'): + # Per-subscriber: unique group so every instance sees + # every message. Filter by correlation ID happens at + # the Subscriber layer above. + group_id = f"{subscription}-{uuid.uuid4()}" + auto_offset_reset = 'latest' + else: + # Shared: named group, competing consumers + group_id = subscription + auto_offset_reset = ( + 'earliest' if initial_position == 'earliest' + else 'latest' + ) + + logger.debug( + f"Creating consumer: topic={topic_name}, " + f"group={group_id}, cls={cls}" + ) + + return KafkaBackendConsumer( + self._bootstrap_servers, topic_name, group_id, + schema, auto_offset_reset, + ) + + def _create_topic_sync(self, topic_name, retention_ms): + """Blocking topic creation via AdminClient.""" + admin = AdminClient(self._admin_config) + new_topic = NewTopic( + topic_name, + num_partitions=1, + replication_factor=1, + config={ + 'retention.ms': str(retention_ms), + }, + ) + fs = admin.create_topics([new_topic]) + for name, f in fs.items(): + try: + f.result() + logger.info(f"Created topic: {name}") + except KafkaException as e: + # Topic already exists — idempotent + if e.args[0].code() == KafkaError.TOPIC_ALREADY_EXISTS: + logger.debug(f"Topic already exists: {name}") + else: + raise + + async def create_topic(self, topic: str) -> None: + """Create a Kafka topic with appropriate retention.""" + topic_name, cls, durable = self._parse_topic(topic) + retention_ms = self._retention_ms(cls) + await asyncio.to_thread( + self._create_topic_sync, topic_name, retention_ms, + ) + + def _delete_topic_sync(self, topic_name): + """Blocking topic deletion via AdminClient.""" + admin = AdminClient(self._admin_config) + fs = admin.delete_topics([topic_name]) + for name, f in fs.items(): + try: + f.result() + logger.info(f"Deleted topic: {name}") + except KafkaException as e: + # Topic doesn't exist — idempotent + if e.args[0].code() == KafkaError.UNKNOWN_TOPIC_OR_PART: + logger.debug(f"Topic not found: {name}") + else: + raise + except Exception as e: + logger.debug(f"Topic delete for {name}: {e}") + + async def delete_topic(self, topic: str) -> None: + """Delete a Kafka topic.""" + topic_name, cls, durable = self._parse_topic(topic) + await asyncio.to_thread(self._delete_topic_sync, topic_name) + + def _topic_exists_sync(self, topic_name): + """Blocking topic existence check via AdminClient.""" + admin = AdminClient(self._admin_config) + metadata = admin.list_topics(timeout=10) + return topic_name in metadata.topics + + async def topic_exists(self, topic: str) -> bool: + """Check whether a Kafka topic exists.""" + topic_name, cls, durable = self._parse_topic(topic) + return await asyncio.to_thread( + self._topic_exists_sync, topic_name, + ) + + async def ensure_topic(self, topic: str) -> None: + """Ensure a topic exists, creating it if necessary.""" + if not await self.topic_exists(topic): + await self.create_topic(topic) + + def close(self) -> None: + pass diff --git a/trustgraph-base/trustgraph/base/pubsub.py b/trustgraph-base/trustgraph/base/pubsub.py index a7ae3719..fb4765c1 100644 --- a/trustgraph-base/trustgraph/base/pubsub.py +++ b/trustgraph-base/trustgraph/base/pubsub.py @@ -17,6 +17,12 @@ DEFAULT_RABBITMQ_USERNAME = os.getenv("RABBITMQ_USERNAME", 'guest') DEFAULT_RABBITMQ_PASSWORD = os.getenv("RABBITMQ_PASSWORD", 'guest') DEFAULT_RABBITMQ_VHOST = os.getenv("RABBITMQ_VHOST", '/') +DEFAULT_KAFKA_BOOTSTRAP = os.getenv("KAFKA_BOOTSTRAP_SERVERS", 'kafka:9092') +DEFAULT_KAFKA_PROTOCOL = os.getenv("KAFKA_SECURITY_PROTOCOL", 'PLAINTEXT') +DEFAULT_KAFKA_SASL_MECHANISM = os.getenv("KAFKA_SASL_MECHANISM", None) +DEFAULT_KAFKA_SASL_USERNAME = os.getenv("KAFKA_SASL_USERNAME", None) +DEFAULT_KAFKA_SASL_PASSWORD = os.getenv("KAFKA_SASL_PASSWORD", None) + def get_pubsub(**config: Any) -> Any: """ @@ -47,6 +53,25 @@ def get_pubsub(**config: Any) -> Any: password=config.get('rabbitmq_password', DEFAULT_RABBITMQ_PASSWORD), vhost=config.get('rabbitmq_vhost', DEFAULT_RABBITMQ_VHOST), ) + elif backend_type == 'kafka': + from .kafka_backend import KafkaBackend + return KafkaBackend( + bootstrap_servers=config.get( + 'kafka_bootstrap_servers', DEFAULT_KAFKA_BOOTSTRAP, + ), + security_protocol=config.get( + 'kafka_security_protocol', DEFAULT_KAFKA_PROTOCOL, + ), + sasl_mechanism=config.get( + 'kafka_sasl_mechanism', DEFAULT_KAFKA_SASL_MECHANISM, + ), + sasl_username=config.get( + 'kafka_sasl_username', DEFAULT_KAFKA_SASL_USERNAME, + ), + sasl_password=config.get( + 'kafka_sasl_password', DEFAULT_KAFKA_SASL_PASSWORD, + ), + ) else: raise ValueError(f"Unknown pub/sub backend: {backend_type}") @@ -65,6 +90,7 @@ def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None: pulsar_host = STANDALONE_PULSAR_HOST if standalone else DEFAULT_PULSAR_HOST pulsar_listener = 'localhost' if standalone else None rabbitmq_host = 'localhost' if standalone else DEFAULT_RABBITMQ_HOST + kafka_bootstrap = 'localhost:9092' if standalone else DEFAULT_KAFKA_BOOTSTRAP parser.add_argument( '--pubsub-backend', @@ -122,3 +148,34 @@ def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None: default=DEFAULT_RABBITMQ_VHOST, help=f'RabbitMQ vhost (default: {DEFAULT_RABBITMQ_VHOST})', ) + + # Kafka options + parser.add_argument( + '--kafka-bootstrap-servers', + default=kafka_bootstrap, + help=f'Kafka bootstrap servers (default: {kafka_bootstrap})', + ) + + parser.add_argument( + '--kafka-security-protocol', + default=DEFAULT_KAFKA_PROTOCOL, + help=f'Kafka security protocol (default: {DEFAULT_KAFKA_PROTOCOL})', + ) + + parser.add_argument( + '--kafka-sasl-mechanism', + default=DEFAULT_KAFKA_SASL_MECHANISM, + help='Kafka SASL mechanism', + ) + + parser.add_argument( + '--kafka-sasl-username', + default=DEFAULT_KAFKA_SASL_USERNAME, + help='Kafka SASL username', + ) + + parser.add_argument( + '--kafka-sasl-password', + default=DEFAULT_KAFKA_SASL_PASSWORD, + help='Kafka SASL password', + ) diff --git a/trustgraph-base/trustgraph/base/pulsar_backend.py b/trustgraph-base/trustgraph/base/pulsar_backend.py index 2100483d..e27d16af 100644 --- a/trustgraph-base/trustgraph/base/pulsar_backend.py +++ b/trustgraph-base/trustgraph/base/pulsar_backend.py @@ -266,22 +266,22 @@ class PulsarBackend: return PulsarBackendConsumer(pulsar_consumer, schema) - async def create_queue(self, topic: str, subscription: str) -> None: + async def create_topic(self, topic: str) -> None: """No-op — Pulsar auto-creates topics on first use. TODO: Use admin REST API for explicit persistent topic creation.""" pass - async def delete_queue(self, topic: str, subscription: str) -> None: + async def delete_topic(self, topic: str) -> None: """No-op — to be replaced with admin REST API calls. - TODO: Delete subscription and persistent topic via admin API.""" + TODO: Delete persistent topic via admin API.""" pass - async def queue_exists(self, topic: str, subscription: str) -> bool: + async def topic_exists(self, topic: str) -> bool: """Returns True — Pulsar auto-creates on subscribe. TODO: Use admin REST API for actual existence check.""" return True - async def ensure_queue(self, topic: str, subscription: str) -> None: + async def ensure_topic(self, topic: str) -> None: """No-op — Pulsar auto-creates topics on first use. TODO: Use admin REST API for explicit creation.""" pass diff --git a/trustgraph-base/trustgraph/base/rabbitmq_backend.py b/trustgraph-base/trustgraph/base/rabbitmq_backend.py index 43c717c3..73b80cb9 100644 --- a/trustgraph-base/trustgraph/base/rabbitmq_backend.py +++ b/trustgraph-base/trustgraph/base/rabbitmq_backend.py @@ -1,22 +1,24 @@ """ RabbitMQ backend implementation for pub/sub abstraction. -Uses a single topic exchange per topicspace. The logical queue name -becomes the routing key. Consumer behavior is determined by the -subscription name: +Each logical topic maps to its own fanout exchange. The exchange name +encodes the full topic identity: -- Same subscription + same topic = shared queue (competing consumers) -- Different subscriptions = separate queues (broadcast / fan-out) + class:topicspace:topic → exchange topicspace.class.topic -This mirrors Pulsar's subscription model using idiomatic RabbitMQ. +Producers publish to the exchange with an empty routing key. +Consumers declare and bind their own queues: + + - flow / request: named durable/non-durable queue (competing consumers) + - response / notify: anonymous exclusive auto-delete queue (per-subscriber) + +The flow service manages topic lifecycle (create/delete exchanges). +Consumers manage their own queue lifecycle (declare + bind on connect). Architecture: - Producer --> [tg exchange] --routing key--> [named queue] --> Consumer - --routing key--> [named queue] --> Consumer - --routing key--> [exclusive q] --> Subscriber - -Uses basic_consume (push) instead of basic_get (polling) for -efficient message delivery. + Producer --> [fanout exchange] --> [named queue] --> Consumer + --> [named queue] --> Consumer + --> [exclusive queue] --> Subscriber """ import asyncio @@ -58,18 +60,16 @@ class RabbitMQMessage: class RabbitMQBackendProducer: - """Publishes messages to a topic exchange with a routing key. + """Publishes messages to a fanout exchange. Uses thread-local connections so each thread gets its own connection/channel. This avoids wire corruption from concurrent threads writing to the same socket (pika is not thread-safe). """ - def __init__(self, connection_params, exchange_name, routing_key, - durable): + def __init__(self, connection_params, exchange_name, durable): self._connection_params = connection_params self._exchange_name = exchange_name - self._routing_key = routing_key self._durable = durable self._local = threading.local() @@ -90,7 +90,7 @@ class RabbitMQBackendProducer: chan = conn.channel() chan.exchange_declare( exchange=self._exchange_name, - exchange_type='topic', + exchange_type='fanout', durable=True, ) self._local.connection = conn @@ -113,7 +113,7 @@ class RabbitMQBackendProducer: channel = self._get_channel() channel.basic_publish( exchange=self._exchange_name, - routing_key=self._routing_key, + routing_key='', body=json_data.encode('utf-8'), properties=amqp_properties, ) @@ -144,19 +144,17 @@ class RabbitMQBackendProducer: class RabbitMQBackendConsumer: - """Consumes from a queue bound to a topic exchange. + """Consumes from a queue bound to a fanout exchange. Uses basic_consume (push model) with messages delivered to an internal thread-safe queue. process_data_events() drives both message delivery and heartbeat processing. """ - def __init__(self, connection_params, exchange_name, routing_key, - queue_name, schema_cls, durable, exclusive=False, - auto_delete=False): + def __init__(self, connection_params, exchange_name, queue_name, + schema_cls, durable, exclusive=False, auto_delete=False): self._connection_params = connection_params self._exchange_name = exchange_name - self._routing_key = routing_key self._queue_name = queue_name self._schema_cls = schema_cls self._durable = durable @@ -171,17 +169,16 @@ class RabbitMQBackendConsumer: self._connection = pika.BlockingConnection(self._connection_params) self._channel = self._connection.channel() - # Declare the topic exchange (idempotent, also done by producers) + # Declare the fanout exchange (idempotent) self._channel.exchange_declare( exchange=self._exchange_name, - exchange_type='topic', + exchange_type='fanout', durable=True, ) if self._exclusive: # Anonymous ephemeral queue (response/notify class). - # These are per-consumer and must be created here — the - # broker assigns the name. + # Per-consumer, broker assigns the name. result = self._channel.queue_declare( queue='', durable=False, @@ -189,20 +186,22 @@ class RabbitMQBackendConsumer: auto_delete=True, ) self._queue_name = result.method.queue - - self._channel.queue_bind( - queue=self._queue_name, - exchange=self._exchange_name, - routing_key=self._routing_key, - ) else: - # Named queue (flow/request class). Queue must already - # exist — created by the flow service or ensure_queue. - # We just verify it exists and bind to consume. + # Named queue (flow/request class). + # Consumer owns its queue — declare and bind here. self._channel.queue_declare( - queue=self._queue_name, passive=True, + queue=self._queue_name, + durable=self._durable, + exclusive=False, + auto_delete=False, ) + # Bind queue to the fanout exchange + self._channel.queue_bind( + queue=self._queue_name, + exchange=self._exchange_name, + ) + self._channel.basic_qos(prefetch_count=1) # Register push-based consumer @@ -318,7 +317,7 @@ class RabbitMQBackendConsumer: class RabbitMQBackend: - """RabbitMQ pub/sub backend using a topic exchange per topicspace.""" + """RabbitMQ pub/sub backend using one fanout exchange per topic.""" def __init__(self, host='localhost', port=5672, username='guest', password='guest', vhost='/'): @@ -331,20 +330,23 @@ class RabbitMQBackend: ) logger.info(f"RabbitMQ backend: {host}:{port} vhost={vhost}") - def _parse_queue_id(self, queue_id: str) -> tuple[str, str, str, bool]: + def _parse_topic(self, topic_id: str) -> tuple[str, str, bool]: """ - Parse queue identifier into exchange, routing key, and durability. + Parse topic identifier into exchange name and durability. Format: class:topicspace:topic - Returns: (exchange_name, routing_key, class, durable) - """ - if ':' not in queue_id: - return 'tg', queue_id, 'flow', False + Returns: (exchange_name, class, durable) - parts = queue_id.split(':', 2) + The exchange name encodes the full topic identity: + class:topicspace:topic → topicspace.class.topic + """ + if ':' not in topic_id: + return f'tg.flow.{topic_id}', 'flow', True + + parts = topic_id.split(':', 2) if len(parts) != 3: raise ValueError( - f"Invalid queue format: {queue_id}, " + f"Invalid topic format: {topic_id}, " f"expected class:topicspace:topic" ) @@ -356,36 +358,28 @@ class RabbitMQBackend: durable = False else: raise ValueError( - f"Invalid queue class: {cls}, " + f"Invalid topic class: {cls}, " f"expected flow, request, response, or notify" ) - # Exchange per topicspace, routing key includes class - exchange_name = topicspace - routing_key = f"{cls}.{topic}" + exchange_name = f"{topicspace}.{cls}.{topic}" - return exchange_name, routing_key, cls, durable - - # Keep map_queue_name for backward compatibility with tests - def map_queue_name(self, queue_id: str) -> tuple[str, bool]: - exchange, routing_key, cls, durable = self._parse_queue_id(queue_id) - return f"{exchange}.{routing_key}", durable + return exchange_name, cls, durable def create_producer(self, topic: str, schema: type, **options) -> BackendProducer: - exchange, routing_key, cls, durable = self._parse_queue_id(topic) + exchange, cls, durable = self._parse_topic(topic) logger.debug( - f"Creating producer: exchange={exchange}, " - f"routing_key={routing_key}" + f"Creating producer: exchange={exchange}" ) return RabbitMQBackendProducer( - self._connection_params, exchange, routing_key, durable, + self._connection_params, exchange, durable, ) def create_consumer(self, topic: str, subscription: str, schema: type, initial_position: str = 'latest', **options) -> BackendConsumer: - """Create a consumer with a queue bound to the topic exchange. + """Create a consumer with a queue bound to the topic's exchange. Behaviour is determined by the topic's class prefix: - flow: named durable queue, competing consumers (round-robin) @@ -393,7 +387,7 @@ class RabbitMQBackend: - response: anonymous ephemeral queue, per-subscriber (auto-delete) - notify: anonymous ephemeral queue, per-subscriber (auto-delete) """ - exchange, routing_key, cls, durable = self._parse_queue_id(topic) + exchange, cls, durable = self._parse_topic(topic) if cls in ('response', 'notify'): # Per-subscriber: anonymous queue, auto-deleted on disconnect @@ -403,45 +397,33 @@ class RabbitMQBackend: auto_delete = True else: # Shared: named queue, competing consumers - queue_name = f"{exchange}.{routing_key}.{subscription}" + queue_name = f"{exchange}.{subscription}" queue_durable = durable exclusive = False auto_delete = False logger.debug( f"Creating consumer: exchange={exchange}, " - f"routing_key={routing_key}, queue={queue_name or '(anonymous)'}, " - f"cls={cls}" + f"queue={queue_name or '(anonymous)'}, cls={cls}" ) return RabbitMQBackendConsumer( - self._connection_params, exchange, routing_key, + self._connection_params, exchange, queue_name, schema, queue_durable, exclusive, auto_delete, ) - def _create_queue_sync(self, exchange, routing_key, queue_name, durable): - """Blocking queue creation — run via asyncio.to_thread.""" + def _create_topic_sync(self, exchange_name): + """Blocking exchange creation — run via asyncio.to_thread.""" connection = None try: connection = pika.BlockingConnection(self._connection_params) channel = connection.channel() channel.exchange_declare( - exchange=exchange, - exchange_type='topic', + exchange=exchange_name, + exchange_type='fanout', durable=True, ) - channel.queue_declare( - queue=queue_name, - durable=durable, - exclusive=False, - auto_delete=False, - ) - channel.queue_bind( - queue=queue_name, - exchange=exchange, - routing_key=routing_key, - ) - logger.info(f"Created queue: {queue_name}") + logger.info(f"Created topic (exchange): {exchange_name}") finally: if connection and connection.is_open: try: @@ -449,34 +431,30 @@ class RabbitMQBackend: except Exception: pass - async def create_queue(self, topic: str, subscription: str) -> None: - """Pre-create a named queue bound to the topic exchange. + async def create_topic(self, topic: str) -> None: + """Create the fanout exchange for a logical topic. - Only applies to shared queues (flow/request class). Response and - notify queues are anonymous/auto-delete and created by consumers. + Only applies to flow and request class topics. Response and + notify exchanges are created on demand by consumers. """ - exchange, routing_key, cls, durable = self._parse_queue_id(topic) + exchange, cls, durable = self._parse_topic(topic) if cls in ('response', 'notify'): return - queue_name = f"{exchange}.{routing_key}.{subscription}" - await asyncio.to_thread( - self._create_queue_sync, exchange, routing_key, - queue_name, durable, - ) + await asyncio.to_thread(self._create_topic_sync, exchange) - def _delete_queue_sync(self, queue_name): - """Blocking queue deletion — run via asyncio.to_thread.""" + def _delete_topic_sync(self, exchange_name): + """Blocking exchange deletion — run via asyncio.to_thread.""" connection = None try: connection = pika.BlockingConnection(self._connection_params) channel = connection.channel() - channel.queue_delete(queue=queue_name) - logger.info(f"Deleted queue: {queue_name}") + channel.exchange_delete(exchange=exchange_name) + logger.info(f"Deleted topic (exchange): {exchange_name}") except Exception as e: - # Idempotent — queue may already be gone - logger.debug(f"Queue delete for {queue_name}: {e}") + # Idempotent — exchange may already be gone + logger.debug(f"Exchange delete for {exchange_name}: {e}") finally: if connection and connection.is_open: try: @@ -484,31 +462,27 @@ class RabbitMQBackend: except Exception: pass - async def delete_queue(self, topic: str, subscription: str) -> None: - """Delete a named queue and any messages it contains. + async def delete_topic(self, topic: str) -> None: + """Delete a topic's fanout exchange. - Only applies to shared queues (flow/request class). Response and - notify queues are anonymous/auto-delete and managed by the broker. + Consumer queues lose their binding and drain naturally. """ - exchange, routing_key, cls, durable = self._parse_queue_id(topic) + exchange, cls, durable = self._parse_topic(topic) + await asyncio.to_thread(self._delete_topic_sync, exchange) - if cls in ('response', 'notify'): - return - - queue_name = f"{exchange}.{routing_key}.{subscription}" - await asyncio.to_thread(self._delete_queue_sync, queue_name) - - def _queue_exists_sync(self, queue_name): - """Blocking queue existence check — run via asyncio.to_thread. + def _topic_exists_sync(self, exchange_name): + """Blocking exchange existence check — run via asyncio.to_thread. Uses passive=True which checks without creating.""" connection = None try: connection = pika.BlockingConnection(self._connection_params) channel = connection.channel() - channel.queue_declare(queue=queue_name, passive=True) + channel.exchange_declare( + exchange=exchange_name, passive=True, + ) return True except pika.exceptions.ChannelClosedByBroker: - # 404 NOT_FOUND — queue does not exist + # 404 NOT_FOUND — exchange does not exist return False finally: if connection and connection.is_open: @@ -517,26 +491,25 @@ class RabbitMQBackend: except Exception: pass - async def queue_exists(self, topic: str, subscription: str) -> bool: - """Check whether a named queue exists. + async def topic_exists(self, topic: str) -> bool: + """Check whether a topic's exchange exists. - Only applies to shared queues (flow/request class). Response and - notify queues are anonymous/ephemeral — always returns False. + Only applies to flow and request class topics. Response and + notify topics are ephemeral — always returns False. """ - exchange, routing_key, cls, durable = self._parse_queue_id(topic) + exchange, cls, durable = self._parse_topic(topic) if cls in ('response', 'notify'): return False - queue_name = f"{exchange}.{routing_key}.{subscription}" return await asyncio.to_thread( - self._queue_exists_sync, queue_name + self._topic_exists_sync, exchange ) - async def ensure_queue(self, topic: str, subscription: str) -> None: - """Ensure a queue exists, creating it if necessary.""" - if not await self.queue_exists(topic, subscription): - await self.create_queue(topic, subscription) + async def ensure_topic(self, topic: str) -> None: + """Ensure a topic exists, creating it if necessary.""" + if not await self.topic_exists(topic): + await self.create_topic(topic) def close(self) -> None: pass diff --git a/trustgraph-flow/trustgraph/chunking/recursive/chunker.py b/trustgraph-flow/trustgraph/chunking/recursive/chunker.py index 257edc8c..81fb7303 100755 --- a/trustgraph-flow/trustgraph/chunking/recursive/chunker.py +++ b/trustgraph-flow/trustgraph/chunking/recursive/chunker.py @@ -10,6 +10,8 @@ from prometheus_client import Histogram from ... schema import TextDocument, Chunk, Metadata, Triples from ... base import ChunkingService, ConsumerSpec, ProducerSpec +RecursiveCharacterTextSplitter = None + from ... provenance import ( chunk_uri as make_chunk_uri, derived_entity_triples, set_graph, GRAPH_SOURCE, @@ -41,8 +43,12 @@ class Processor(ChunkingService): self.default_chunk_size = chunk_size self.default_chunk_overlap = chunk_overlap - from langchain_text_splitters import RecursiveCharacterTextSplitter - self.RecursiveCharacterTextSplitter = RecursiveCharacterTextSplitter + global RecursiveCharacterTextSplitter + if RecursiveCharacterTextSplitter is None: + from langchain_text_splitters import ( + RecursiveCharacterTextSplitter as _cls, + ) + RecursiveCharacterTextSplitter = _cls if not hasattr(__class__, "chunk_metric"): __class__.chunk_metric = Histogram( diff --git a/trustgraph-flow/trustgraph/chunking/token/chunker.py b/trustgraph-flow/trustgraph/chunking/token/chunker.py index d315c428..e0d58b41 100755 --- a/trustgraph-flow/trustgraph/chunking/token/chunker.py +++ b/trustgraph-flow/trustgraph/chunking/token/chunker.py @@ -10,6 +10,8 @@ from prometheus_client import Histogram from ... schema import TextDocument, Chunk, Metadata, Triples from ... base import ChunkingService, ConsumerSpec, ProducerSpec +TokenTextSplitter = None + from ... provenance import ( chunk_uri as make_chunk_uri, derived_entity_triples, set_graph, GRAPH_SOURCE, @@ -41,8 +43,10 @@ class Processor(ChunkingService): self.default_chunk_size = chunk_size self.default_chunk_overlap = chunk_overlap - from langchain_text_splitters import TokenTextSplitter - self.TokenTextSplitter = TokenTextSplitter + global TokenTextSplitter + if TokenTextSplitter is None: + from langchain_text_splitters import TokenTextSplitter as _cls + TokenTextSplitter = _cls if not hasattr(__class__, "chunk_metric"): __class__.chunk_metric = Histogram( diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index 75232315..fe44b852 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -124,9 +124,7 @@ class Processor(AsyncProcessor): async def start(self): - await self.pubsub.ensure_queue( - self.config_request_topic, self.config_request_subscriber - ) + await self.pubsub.ensure_topic(self.config_request_topic) await self.push() # Startup poke: empty types = everything await self.config_request_consumer.start() diff --git a/trustgraph-flow/trustgraph/cores/service.py b/trustgraph-flow/trustgraph/cores/service.py index 400f96d1..93017c30 100755 --- a/trustgraph-flow/trustgraph/cores/service.py +++ b/trustgraph-flow/trustgraph/cores/service.py @@ -119,9 +119,7 @@ class Processor(AsyncProcessor): async def start(self): - await self.pubsub.ensure_queue( - self.knowledge_request_topic, self.knowledge_request_subscriber - ) + await self.pubsub.ensure_topic(self.knowledge_request_topic) await super(Processor, self).start() await self.knowledge_request_consumer.start() await self.knowledge_response_producer.start() diff --git a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py index ab31b717..7f9ca71d 100755 --- a/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py +++ b/trustgraph-flow/trustgraph/decoding/pdf/pdf_decoder.py @@ -15,6 +15,9 @@ from ... schema import Document, TextDocument, Metadata from ... schema import librarian_request_queue, librarian_response_queue from ... schema import Triples from ... base import FlowProcessor, ConsumerSpec, ProducerSpec, LibrarianClient + +PyPDFLoader = None + from ... provenance import ( document_uri, page_uri as make_page_uri, derived_entity_triples, set_graph, GRAPH_SOURCE, @@ -128,7 +131,12 @@ class Processor(FlowProcessor): fp.write(base64.b64decode(v.data)) fp.close() - from langchain_community.document_loaders import PyPDFLoader + global PyPDFLoader + if PyPDFLoader is None: + from langchain_community.document_loaders import ( + PyPDFLoader as _cls, + ) + PyPDFLoader = _cls loader = PyPDFLoader(temp_path) pages = loader.load() diff --git a/trustgraph-flow/trustgraph/flow/service/flow.py b/trustgraph-flow/trustgraph/flow/service/flow.py index 477c6a2c..b864faf9 100644 --- a/trustgraph-flow/trustgraph/flow/service/flow.py +++ b/trustgraph-flow/trustgraph/flow/service/flow.py @@ -7,7 +7,7 @@ import logging # Module logger logger = logging.getLogger(__name__) -# Queue deletion retry settings +# Topic deletion retry settings DELETE_RETRIES = 5 DELETE_RETRY_DELAY = 2 # seconds @@ -215,11 +215,11 @@ class FlowConfig: return result - # Pre-create flow-level queues so the data path is wired + # Pre-create topic exchanges so the data path is wired # before processors receive their config and start connecting. - queues = self._collect_flow_queues(cls, repl_template_with_params) - for topic, subscription in queues: - await self.pubsub.create_queue(topic, subscription) + topics = self._collect_flow_topics(cls, repl_template_with_params) + for topic in topics: + await self.pubsub.create_topic(topic) # Build all processor config updates, then write in a single batch. updates = [] @@ -283,8 +283,8 @@ class FlowConfig: error = None, ) - async def ensure_existing_flow_queues(self): - """Ensure queues exist for all already-running flows. + async def ensure_existing_flow_topics(self): + """Ensure topics exist for all already-running flows. Called on startup to handle flows that were started before this version of the flow service was deployed, or before a restart. @@ -315,7 +315,7 @@ class FlowConfig: if blueprint_data is None: logger.warning( f"Blueprint '{blueprint_name}' not found for " - f"flow '{flow_id}', skipping queue creation" + f"flow '{flow_id}', skipping topic creation" ) continue @@ -333,65 +333,63 @@ class FlowConfig: ) return result - queues = self._collect_flow_queues(cls, repl_template) - for topic, subscription in queues: - await self.pubsub.ensure_queue(topic, subscription) + topics = self._collect_flow_topics(cls, repl_template) + for topic in topics: + await self.pubsub.ensure_topic(topic) logger.info( - f"Ensured queues for existing flow '{flow_id}'" + f"Ensured topics for existing flow '{flow_id}'" ) except Exception as e: logger.error( - f"Failed to ensure queues for flow '{flow_id}': {e}" + f"Failed to ensure topics for flow '{flow_id}': {e}" ) - def _collect_flow_queues(self, cls, repl_template): - """Collect (topic, subscription) pairs for all flow-level queues. + def _collect_flow_topics(self, cls, repl_template): + """Collect unique topic identifiers from the blueprint. - Iterates the blueprint's "flow" section and reads only the - "topics" dict from each processor entry. + Iterates the blueprint's "flow" section and returns a + deduplicated set of resolved topic strings. The flow service + manages topic lifecycle (create/delete exchanges), not + individual consumer queues. """ - queues = [] + topics = set() for k, v in cls["flow"].items(): - processor, variant = k.split(":", 1) - variant = repl_template(variant) - for spec_name, topic_template in v.get("topics", {}).items(): topic = repl_template(topic_template) - subscription = f"{processor}--{variant}--{spec_name}" - queues.append((topic, subscription)) + topics.add(topic) - return queues + return topics - async def _delete_queues(self, queues): - """Delete queues with retries. Best-effort — logs failures but + async def _delete_topics(self, topics): + """Delete topics with retries. Best-effort — logs failures but does not raise.""" for attempt in range(DELETE_RETRIES): remaining = [] - for topic, subscription in queues: + for topic in topics: try: - await self.pubsub.delete_queue(topic, subscription) + await self.pubsub.delete_topic(topic) except Exception as e: logger.warning( - f"Queue delete failed (attempt {attempt + 1}/" + f"Topic delete failed (attempt {attempt + 1}/" f"{DELETE_RETRIES}): {topic}: {e}" ) - remaining.append((topic, subscription)) + remaining.append(topic) if not remaining: return - queues = remaining + topics = remaining if attempt < DELETE_RETRIES - 1: await asyncio.sleep(DELETE_RETRY_DELAY) - for topic, subscription in queues: + for topic in topics: logger.error( - f"Failed to delete queue after {DELETE_RETRIES} " + f"Failed to delete topic after {DELETE_RETRIES} " f"attempts: {topic}" ) @@ -426,8 +424,8 @@ class FlowConfig: result = result.replace(f"{{{param_name}}}", str(param_value)) return result - # Collect queue identifiers before removing config - queues = self._collect_flow_queues(cls, repl_template) + # Collect topic identifiers before removing config + topics = self._collect_flow_topics(cls, repl_template) # Phase 1: Set status to "stopping" and remove processor config. # The config push tells processors to shut down their consumers. @@ -448,8 +446,8 @@ class FlowConfig: await self.config.delete_many(deletes) - # Phase 2: Delete queues with retries, then remove the flow record. - await self._delete_queues(queues) + # Phase 2: Delete topics with retries, then remove the flow record. + await self._delete_topics(topics) if msg.flow_id in await self.config.keys("flow"): await self.config.delete("flow", msg.flow_id) diff --git a/trustgraph-flow/trustgraph/flow/service/service.py b/trustgraph-flow/trustgraph/flow/service/service.py index a3f2fb6b..e1997452 100644 --- a/trustgraph-flow/trustgraph/flow/service/service.py +++ b/trustgraph-flow/trustgraph/flow/service/service.py @@ -101,11 +101,9 @@ class Processor(AsyncProcessor): async def start(self): - await self.pubsub.ensure_queue( - self.flow_request_topic, self.flow_request_subscriber - ) + await self.pubsub.ensure_topic(self.flow_request_topic) await self.config_client.start() - await self.flow.ensure_existing_flow_queues() + await self.flow.ensure_existing_flow_topics() await self.flow_request_consumer.start() async def on_flow_request(self, msg, consumer, flow): diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index 83f97bf3..ed005298 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -263,12 +263,8 @@ class Processor(AsyncProcessor): async def start(self): - await self.pubsub.ensure_queue( - self.librarian_request_topic, self.librarian_request_subscriber - ) - await self.pubsub.ensure_queue( - self.collection_request_topic, self.collection_request_subscriber - ) + await self.pubsub.ensure_topic(self.librarian_request_topic) + await self.pubsub.ensure_topic(self.collection_request_topic) await super(Processor, self).start() await self.librarian_request_consumer.start() await self.librarian_response_producer.start() diff --git a/trustgraph-vertexai/trustgraph/model/text_completion/googleaistudio/llm.py b/trustgraph-vertexai/trustgraph/model/text_completion/googleaistudio/llm.py index 9f431d20..142fc45c 100644 --- a/trustgraph-vertexai/trustgraph/model/text_completion/googleaistudio/llm.py +++ b/trustgraph-vertexai/trustgraph/model/text_completion/googleaistudio/llm.py @@ -18,6 +18,12 @@ import logging # Module logger logger = logging.getLogger(__name__) +from google import genai +from google.genai import types +from google.genai.types import HarmCategory, HarmBlockThreshold +from google.genai.errors import ClientError +from google.api_core.exceptions import ResourceExhausted + from .... exceptions import TooManyRequests from .... base import LlmService, LlmResult, LlmChunk @@ -71,20 +77,20 @@ class Processor(LlmService): block_level = self.HarmBlockThreshold.BLOCK_ONLY_HIGH self.safety_settings = [ - self.types.SafetySetting( - category = self.HarmCategory.HARM_CATEGORY_HATE_SPEECH, + types.SafetySetting( + category = HarmCategory.HARM_CATEGORY_HATE_SPEECH, threshold = block_level, ), - self.types.SafetySetting( - category = self.HarmCategory.HARM_CATEGORY_HARASSMENT, + types.SafetySetting( + category = HarmCategory.HARM_CATEGORY_HARASSMENT, threshold = block_level, ), - self.types.SafetySetting( - category = self.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, + types.SafetySetting( + category = HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, threshold = block_level, ), - self.types.SafetySetting( - category = self.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, + types.SafetySetting( + category = HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, threshold = block_level, ), # There is a documentation conflict on whether or not @@ -104,7 +110,7 @@ class Processor(LlmService): if cache_key not in self.generation_configs: logger.info(f"Creating generation config for '{model_name}' with temperature {effective_temperature}") - self.generation_configs[cache_key] = self.types.GenerateContentConfig( + self.generation_configs[cache_key] = types.GenerateContentConfig( temperature = effective_temperature, top_p = 1, top_k = 40, @@ -160,7 +166,7 @@ class Processor(LlmService): # Leave rate limit retries to the default handler raise TooManyRequests() - except self.ClientError as e: + except ClientError as e: # google-genai SDK throws ClientError for 4xx errors if e.code == 429: logger.warning(f"Rate limit exceeded (ClientError 429): {e}") @@ -229,11 +235,11 @@ class Processor(LlmService): logger.debug("Streaming complete") - except self.ResourceExhausted: + except ResourceExhausted: logger.warning("Rate limit exceeded during streaming") raise TooManyRequests() - except self.ClientError as e: + except ClientError as e: # google-genai SDK throws ClientError for 4xx errors if e.code == 429: logger.warning(f"Rate limit exceeded during streaming (ClientError 429): {e}")