Merge branch 'release/v2.3'

This commit is contained in:
Cyber MacGeddon 2026-04-18 12:09:52 +01:00
commit 222537c26b
18 changed files with 1020 additions and 247 deletions

View file

@ -0,0 +1,200 @@
---
layout: default
title: "Kafka Pub/Sub Backend Technical Specification"
parent: "Tech Specs"
---
# Kafka Pub/Sub Backend Technical Specification
## Overview
Add Apache Kafka as a third pub/sub backend alongside Pulsar and RabbitMQ.
Kafka's topic model maps naturally to TrustGraph's pub/sub abstraction:
topics are first-class, consumer groups provide competing-consumer
semantics, and the AdminClient handles topic lifecycle.
## Problem
TrustGraph currently supports Pulsar and RabbitMQ. Kafka is widely
deployed and operationally familiar to many teams. Its log-based
architecture provides durable, replayable message streams with
well-understood scaling properties.
## Design
### Concept Mapping
| TrustGraph concept | Kafka equivalent |
|---|---|
| Topic (`class:topicspace:topic`) | Kafka topic (named `topicspace.class.topic`) |
| Subscription (competing consumers) | Consumer group |
| `create_topic` / `delete_topic` | `AdminClient.create_topics()` / `delete_topics()` |
| `ensure_topic` | `AdminClient.create_topics()` (idempotent) |
| Producer | `KafkaProducer` |
| Consumer | `KafkaConsumer` in a consumer group |
| Message acknowledge | Commit offset |
| Message negative acknowledge | Seek back to message offset |
### Topic Naming
The topic name follows the same convention as the RabbitMQ exchange
name:
```
class:topicspace:topic -> topicspace.class.topic
```
Examples:
- `flow:tg:text-completion-request` -> `tg.flow.text-completion-request`
- `request:tg:librarian` -> `tg.request.librarian`
- `response:tg:config` -> `tg.response.config`
### Topic Classes and Retention
Kafka topics are always durable (log-based). The class prefix determines
retention policy rather than durability:
| Class | Retention | Partitions | Notes |
|---|---|---|---|
| `flow` | Long or infinite | 1 | Data pipeline, order preserved |
| `request` | Short (e.g. 300s) | 1 | RPC requests, ephemeral |
| `response` | Short (e.g. 300s) | 1 | RPC responses, shared (see below) |
| `notify` | Short (e.g. 300s) | 1 | Broadcast signals |
Single partition per topic preserves message ordering and makes
offset-based acknowledgment equivalent to per-message ack. This matches
the current `prefetch_count=1` model used across all backends.
### Producers
Straightforward `KafkaProducer` wrapping. Messages are serialised as
JSON (consistent with the RabbitMQ backend). Message properties/headers
map to Kafka record headers.
### Consumers
#### Flow and Request Class (Competing Consumers)
Consumer group ID = subscription name. Multiple consumers in the same
group share the workload (Kafka's native consumer group rebalancing).
```
group_id = subscription # e.g. "triples-store--default--input"
```
#### Response and Notify Class (Per-Subscriber)
This is where Kafka differs from RabbitMQ. Kafka has no anonymous
exclusive auto-delete queues.
Design: use a **shared response topic with unique consumer groups**.
Each subscriber gets its own consumer group (using the existing
UUID-based subscription name from `RequestResponseSpec`). Every
subscriber reads all messages from the topic and filters by correlation
ID, discarding non-matching messages.
This is slightly wasteful — N subscribers each read every response — but
request/response traffic is low-volume compared to the data pipeline.
The alternative (per-instance temporary topics) would require dynamic
topic creation/deletion for every API gateway request, which is
expensive in Kafka (AdminClient operations involve controller
coordination).
### Acknowledgment
#### Acknowledge (Success)
Commit the message's offset. With a single partition and sequential
processing, this is equivalent to per-message ack:
```
consumer.commit(offsets={partition: offset + 1})
```
#### Negative Acknowledge (Failure / Retry)
Kafka has no native nack-with-redelivery. On processing failure, seek
the consumer back to the failed message's offset:
```
consumer.seek(partition, offset)
```
The message is redelivered on the next poll. This matches the current
RabbitMQ `basic_nack(requeue=True)` behaviour: the message is retried
by the same consumer.
### Topic Lifecycle
The flow service creates and deletes topics via the Kafka AdminClient:
- **Flow start**: `AdminClient.create_topics()` for each unique topic
in the blueprint. Topic config includes `retention.ms` based on class.
- **Flow stop**: `AdminClient.delete_topics()` for the flow's topics.
- **Service startup**: `ensure_topic` creates the topic if it doesn't
exist (idempotent via `create_topics` with `validate_only=False`).
Unlike RabbitMQ where consumers declare their own queues, Kafka topics
must exist before consumers connect. The flow service and service
startup `ensure_topic` calls handle this.
### Message Encoding
JSON body, consistent with the RabbitMQ backend. Serialisation uses the
existing `dataclass_to_dict` / `dict_to_dataclass` helpers. Message
properties map to Kafka record headers (byte-encoded string values).
### Configuration
New CLI arguments following the existing pattern:
```
--pubsub-backend kafka
--kafka-bootstrap-servers localhost:9092
--kafka-security-protocol PLAINTEXT
--kafka-sasl-mechanism (optional)
--kafka-sasl-username (optional)
--kafka-sasl-password (optional)
```
The factory in `pubsub.py` creates a `KafkaBackend` instance when
`pubsub_backend='kafka'`.
### Dependencies
`kafka-python-ng` or `confluent-kafka`. The `confluent-kafka` package
provides both producer/consumer and AdminClient in one library with
better performance (C-backed librdkafka), but requires a C extension
build. `kafka-python-ng` is pure Python, simpler to install.
## Key Design Decisions
1. **Shared response topic with filtering** over per-instance temporary
topics. Avoids expensive dynamic topic creation for every RPC
exchange. Acceptable because response traffic is low-volume.
2. **Seek-back for negative acknowledge** over not-committing or retry
topics. Provides immediate redelivery consistent with the RabbitMQ
nack behaviour.
3. **Single partition per topic** to preserve ordering and simplify
offset management. Parallelism comes from multiple topics and
multiple services, not from partitioning within a topic.
4. **Retention-based class semantics** instead of durability flags.
Kafka topics are always durable; short retention achieves the
ephemeral behaviour needed for request/response/notify classes.
## Open Questions
- **Retention values**: exact `retention.ms` for short-lived topic
classes. 300s (5 minutes) is a starting point; may need tuning based
on worst-case restart/reconnect times.
- **Library choice**: `confluent-kafka` vs `kafka-python-ng`. Performance
vs install simplicity trade-off. Could support both behind a thin
wrapper.
- **Consumer poll timeout**: needs to align with the existing
`receive(timeout_millis)` API. Kafka's `poll()` takes a timeout
directly, so this maps cleanly.

View file

@ -0,0 +1,131 @@
"""
Unit tests for Kafka backend topic parsing and factory dispatch.
Does not require a running Kafka instance.
"""
import pytest
import argparse
from trustgraph.base.kafka_backend import KafkaBackend
from trustgraph.base.pubsub import get_pubsub, add_pubsub_args
class TestKafkaParseTopic:
@pytest.fixture
def backend(self):
b = object.__new__(KafkaBackend)
return b
def test_flow_is_durable(self, backend):
name, cls, durable = backend._parse_topic('flow:tg:text-completion-request')
assert durable is True
assert cls == 'flow'
assert name == 'tg.flow.text-completion-request'
def test_notify_is_not_durable(self, backend):
name, cls, durable = backend._parse_topic('notify:tg:config')
assert durable is False
assert cls == 'notify'
assert name == 'tg.notify.config'
def test_request_is_not_durable(self, backend):
name, cls, durable = backend._parse_topic('request:tg:config')
assert durable is False
assert cls == 'request'
assert name == 'tg.request.config'
def test_response_is_not_durable(self, backend):
name, cls, durable = backend._parse_topic('response:tg:librarian')
assert durable is False
assert cls == 'response'
assert name == 'tg.response.librarian'
def test_custom_topicspace(self, backend):
name, cls, durable = backend._parse_topic('flow:prod:my-queue')
assert name == 'prod.flow.my-queue'
assert durable is True
def test_no_colon_defaults_to_flow(self, backend):
name, cls, durable = backend._parse_topic('simple-queue')
assert name == 'tg.flow.simple-queue'
assert cls == 'flow'
assert durable is True
def test_invalid_class_raises(self, backend):
with pytest.raises(ValueError, match="Invalid topic class"):
backend._parse_topic('unknown:tg:topic')
def test_topic_with_flow_suffix(self, backend):
"""Topic names with flow suffix (e.g. :default) are preserved."""
name, cls, durable = backend._parse_topic('request:tg:prompt:default')
assert name == 'tg.request.prompt:default'
class TestKafkaRetention:
@pytest.fixture
def backend(self):
b = object.__new__(KafkaBackend)
return b
def test_flow_gets_long_retention(self, backend):
assert backend._retention_ms('flow') == 7 * 24 * 60 * 60 * 1000
def test_request_gets_short_retention(self, backend):
assert backend._retention_ms('request') == 300 * 1000
def test_response_gets_short_retention(self, backend):
assert backend._retention_ms('response') == 300 * 1000
def test_notify_gets_short_retention(self, backend):
assert backend._retention_ms('notify') == 300 * 1000
class TestGetPubsubKafka:
def test_factory_creates_kafka_backend(self):
backend = get_pubsub(pubsub_backend='kafka')
assert isinstance(backend, KafkaBackend)
def test_factory_passes_config(self):
backend = get_pubsub(
pubsub_backend='kafka',
kafka_bootstrap_servers='myhost:9093',
kafka_security_protocol='SASL_SSL',
kafka_sasl_mechanism='PLAIN',
kafka_sasl_username='user',
kafka_sasl_password='pass',
)
assert isinstance(backend, KafkaBackend)
assert backend._bootstrap_servers == 'myhost:9093'
assert backend._admin_config['security.protocol'] == 'SASL_SSL'
assert backend._admin_config['sasl.mechanism'] == 'PLAIN'
assert backend._admin_config['sasl.username'] == 'user'
assert backend._admin_config['sasl.password'] == 'pass'
class TestAddPubsubArgsKafka:
def test_kafka_args_present(self):
parser = argparse.ArgumentParser()
add_pubsub_args(parser)
args = parser.parse_args([
'--pubsub-backend', 'kafka',
'--kafka-bootstrap-servers', 'myhost:9093',
])
assert args.pubsub_backend == 'kafka'
assert args.kafka_bootstrap_servers == 'myhost:9093'
def test_kafka_defaults_container(self):
parser = argparse.ArgumentParser()
add_pubsub_args(parser)
args = parser.parse_args([])
assert args.kafka_bootstrap_servers == 'kafka:9092'
assert args.kafka_security_protocol == 'PLAINTEXT'
def test_kafka_standalone_defaults_to_localhost(self):
parser = argparse.ArgumentParser()
add_pubsub_args(parser, standalone=True)
args = parser.parse_args([])
assert args.kafka_bootstrap_servers == 'localhost:9092'

View file

@ -1,18 +1,16 @@
""" """
Unit tests for RabbitMQ backend queue name mapping and factory dispatch. Unit tests for RabbitMQ backend topic parsing and factory dispatch.
Does not require a running RabbitMQ instance. Does not require a running RabbitMQ instance.
""" """
import pytest import pytest
import argparse import argparse
pika = pytest.importorskip("pika", reason="pika not installed")
from trustgraph.base.rabbitmq_backend import RabbitMQBackend from trustgraph.base.rabbitmq_backend import RabbitMQBackend
from trustgraph.base.pubsub import get_pubsub, add_pubsub_args from trustgraph.base.pubsub import get_pubsub, add_pubsub_args
class TestRabbitMQMapQueueName: class TestRabbitMQParseTopic:
@pytest.fixture @pytest.fixture
def backend(self): def backend(self):
@ -20,43 +18,48 @@ class TestRabbitMQMapQueueName:
return b return b
def test_flow_is_durable(self, backend): def test_flow_is_durable(self, backend):
name, durable = backend.map_queue_name('flow:tg:text-completion-request') exchange, cls, durable = backend._parse_topic('flow:tg:text-completion-request')
assert durable is True assert durable is True
assert name == 'tg.flow.text-completion-request' assert cls == 'flow'
assert exchange == 'tg.flow.text-completion-request'
def test_notify_is_not_durable(self, backend): def test_notify_is_not_durable(self, backend):
name, durable = backend.map_queue_name('notify:tg:config') exchange, cls, durable = backend._parse_topic('notify:tg:config')
assert durable is False assert durable is False
assert name == 'tg.notify.config' assert cls == 'notify'
assert exchange == 'tg.notify.config'
def test_request_is_not_durable(self, backend): def test_request_is_not_durable(self, backend):
name, durable = backend.map_queue_name('request:tg:config') exchange, cls, durable = backend._parse_topic('request:tg:config')
assert durable is False assert durable is False
assert name == 'tg.request.config' assert cls == 'request'
assert exchange == 'tg.request.config'
def test_response_is_not_durable(self, backend): def test_response_is_not_durable(self, backend):
name, durable = backend.map_queue_name('response:tg:librarian') exchange, cls, durable = backend._parse_topic('response:tg:librarian')
assert durable is False assert durable is False
assert name == 'tg.response.librarian' assert cls == 'response'
assert exchange == 'tg.response.librarian'
def test_custom_topicspace(self, backend): def test_custom_topicspace(self, backend):
name, durable = backend.map_queue_name('flow:prod:my-queue') exchange, cls, durable = backend._parse_topic('flow:prod:my-queue')
assert name == 'prod.flow.my-queue' assert exchange == 'prod.flow.my-queue'
assert durable is True assert durable is True
def test_no_colon_defaults_to_flow(self, backend): def test_no_colon_defaults_to_flow(self, backend):
name, durable = backend.map_queue_name('simple-queue') exchange, cls, durable = backend._parse_topic('simple-queue')
assert name == 'tg.simple-queue' assert exchange == 'tg.flow.simple-queue'
assert durable is False assert cls == 'flow'
assert durable is True
def test_invalid_class_raises(self, backend): def test_invalid_class_raises(self, backend):
with pytest.raises(ValueError, match="Invalid queue class"): with pytest.raises(ValueError, match="Invalid topic class"):
backend.map_queue_name('unknown:tg:topic') backend._parse_topic('unknown:tg:topic')
def test_flow_with_flow_suffix(self, backend): def test_topic_with_flow_suffix(self, backend):
"""Queue names with flow suffix (e.g. :default) are preserved.""" """Topic names with flow suffix (e.g. :default) are preserved."""
name, durable = backend.map_queue_name('request:tg:prompt:default') exchange, cls, durable = backend._parse_topic('request:tg:prompt:default')
assert name == 'tg.request.prompt:default' assert exchange == 'tg.request.prompt:default'
class TestGetPubsubRabbitMQ: class TestGetPubsubRabbitMQ:

View file

@ -15,6 +15,7 @@ dependencies = [
"requests", "requests",
"python-logging-loki", "python-logging-loki",
"pika", "pika",
"confluent-kafka",
"pyyaml", "pyyaml",
] ]
classifiers = [ classifiers = [

View file

@ -121,7 +121,7 @@ class PubSubBackend(Protocol):
Create a producer for a topic. Create a producer for a topic.
Args: Args:
topic: Generic topic format (qos/tenant/namespace/queue) topic: Queue identifier in class:topicspace:topic format
schema: Dataclass type for messages schema: Dataclass type for messages
**options: Backend-specific options (e.g., chunking_enabled) **options: Backend-specific options (e.g., chunking_enabled)
@ -159,59 +159,55 @@ class PubSubBackend(Protocol):
""" """
... ...
async def create_queue(self, topic: str, subscription: str) -> None: async def create_topic(self, topic: str) -> None:
""" """
Pre-create a queue so it exists before any consumer connects. Create the broker-side resources for a logical topic.
The topic and subscription together identify the queue, mirroring For RabbitMQ this creates a fanout exchange. For Pulsar this is
create_consumer where the queue name is derived from both. a no-op (topics auto-create on first use).
Idempotent creating an already-existing queue succeeds silently. Idempotent creating an already-existing topic succeeds silently.
Args: Args:
topic: Queue identifier in class:topicspace:topic format topic: Topic identifier in class:topicspace:topic format
subscription: Subscription/consumer group name
""" """
... ...
async def delete_queue(self, topic: str, subscription: str) -> None: async def delete_topic(self, topic: str) -> None:
""" """
Delete a queue and any messages it contains. Delete a topic and discard any in-flight messages.
The topic and subscription together identify the queue, mirroring For RabbitMQ this deletes the fanout exchange; consumer queues
create_consumer where the queue name is derived from both. lose their binding and drain naturally.
Idempotent deleting a non-existent queue succeeds silently. Idempotent deleting a non-existent topic succeeds silently.
Args: Args:
topic: Queue identifier in class:topicspace:topic format topic: Topic identifier in class:topicspace:topic format
subscription: Subscription/consumer group name
""" """
... ...
async def queue_exists(self, topic: str, subscription: str) -> bool: async def topic_exists(self, topic: str) -> bool:
""" """
Check whether a queue exists. Check whether a topic exists.
Args: Args:
topic: Queue identifier in class:topicspace:topic format topic: Topic identifier in class:topicspace:topic format
subscription: Subscription/consumer group name
Returns: Returns:
True if the queue exists, False otherwise. True if the topic exists, False otherwise.
""" """
... ...
async def ensure_queue(self, topic: str, subscription: str) -> None: async def ensure_topic(self, topic: str) -> None:
""" """
Ensure a queue exists, creating it if necessary. Ensure a topic exists, creating it if necessary.
Convenience wrapper checks existence, creates if missing. Convenience wrapper checks existence, creates if missing.
Used by system services on startup. Used by the flow service and system services on startup.
Args: Args:
topic: Queue identifier in class:topicspace:topic format topic: Topic identifier in class:topicspace:topic format
subscription: Subscription/consumer group name
""" """
... ...

View file

@ -0,0 +1,400 @@
"""
Kafka backend implementation for pub/sub abstraction.
Each logical topic maps to a Kafka topic. The topic name encodes
the full identity:
class:topicspace:topic -> topicspace.class.topic
Producers publish to the topic directly.
Consumers use consumer groups for competing-consumer semantics:
- flow / request: named consumer group (competing consumers)
- response / notify: unique consumer group per instance, filtering
messages by correlation ID (all subscribers see all messages)
The flow service manages topic lifecycle via AdminClient.
Architecture:
Producer --> [Kafka topic] --> Consumer Group A --> Consumer
--> Consumer Group A --> Consumer
--> Consumer Group B --> Consumer (response)
"""
import asyncio
import json
import logging
import uuid
from typing import Any
from confluent_kafka import (
Producer as KafkaProducer,
Consumer as KafkaConsumer,
TopicPartition,
KafkaError,
KafkaException,
)
from confluent_kafka.admin import AdminClient, NewTopic
from .backend import PubSubBackend, BackendProducer, BackendConsumer, Message
from .serialization import dataclass_to_dict, dict_to_dataclass
logger = logging.getLogger(__name__)
# Retention defaults (milliseconds)
LONG_RETENTION_MS = 7 * 24 * 60 * 60 * 1000 # 7 days
SHORT_RETENTION_MS = 300 * 1000 # 5 minutes
class KafkaMessage:
"""Wrapper for Kafka messages to match Message protocol."""
def __init__(self, msg, schema_cls):
self._msg = msg
self._schema_cls = schema_cls
self._value = None
def value(self) -> Any:
"""Deserialize and return the message value as a dataclass."""
if self._value is None:
data_dict = json.loads(self._msg.value().decode('utf-8'))
self._value = dict_to_dataclass(data_dict, self._schema_cls)
return self._value
def properties(self) -> dict:
"""Return message properties from Kafka headers."""
headers = self._msg.headers() or []
return {
k: v.decode('utf-8') if isinstance(v, bytes) else v
for k, v in headers
}
class KafkaBackendProducer:
"""Publishes messages to a Kafka topic.
confluent-kafka Producer is thread-safe, so a single instance
can be shared across threads.
"""
def __init__(self, bootstrap_servers, topic_name, durable):
self._topic_name = topic_name
self._durable = durable
self._producer = KafkaProducer({
'bootstrap.servers': bootstrap_servers,
'acks': 'all' if durable else '1',
})
def send(self, message: Any, properties: dict = {}) -> None:
data_dict = dataclass_to_dict(message)
json_data = json.dumps(data_dict).encode('utf-8')
headers = [
(k, str(v).encode('utf-8'))
for k, v in properties.items()
] if properties else None
self._producer.produce(
topic=self._topic_name,
value=json_data,
headers=headers,
)
self._producer.flush()
def flush(self) -> None:
self._producer.flush()
def close(self) -> None:
self._producer.flush()
class KafkaBackendConsumer:
"""Consumes from a Kafka topic using a consumer group.
Uses confluent-kafka Consumer.poll() for message delivery.
Not thread-safe each instance must be used from a single thread,
which matches the ThreadPoolExecutor pattern in consumer.py.
"""
def __init__(self, bootstrap_servers, topic_name, group_id,
schema_cls, auto_offset_reset='latest'):
self._bootstrap_servers = bootstrap_servers
self._topic_name = topic_name
self._group_id = group_id
self._schema_cls = schema_cls
self._auto_offset_reset = auto_offset_reset
self._consumer = None
def _connect(self):
self._consumer = KafkaConsumer({
'bootstrap.servers': self._bootstrap_servers,
'group.id': self._group_id,
'auto.offset.reset': self._auto_offset_reset,
'enable.auto.commit': False,
})
self._consumer.subscribe([self._topic_name])
logger.info(
f"Kafka consumer connected: topic={self._topic_name}, "
f"group={self._group_id}"
)
def _is_alive(self):
return self._consumer is not None
def ensure_connected(self) -> None:
"""Eagerly connect and subscribe.
For response/notify consumers this must be called before the
corresponding request is published, so that the consumer is
assigned a partition and will see the response message.
"""
if not self._is_alive():
self._connect()
# Force a partition assignment by polling briefly.
# Without this, the consumer may not be assigned partitions
# until the first real poll(), creating a race where the
# request is sent before assignment completes.
self._consumer.poll(timeout=1.0)
def receive(self, timeout_millis: int = 2000) -> Message:
"""Receive a message. Raises TimeoutError if none available."""
if not self._is_alive():
self._connect()
timeout_seconds = timeout_millis / 1000.0
msg = self._consumer.poll(timeout=timeout_seconds)
if msg is None:
raise TimeoutError("No message received within timeout")
if msg.error():
error = msg.error()
if error.code() == KafkaError._PARTITION_EOF:
raise TimeoutError("End of partition reached")
raise KafkaException(error)
return KafkaMessage(msg, self._schema_cls)
def acknowledge(self, message: Message) -> None:
"""Commit the message's offset (next offset to read)."""
if isinstance(message, KafkaMessage) and message._msg:
tp = TopicPartition(
message._msg.topic(),
message._msg.partition(),
message._msg.offset() + 1,
)
self._consumer.commit(offsets=[tp], asynchronous=False)
def negative_acknowledge(self, message: Message) -> None:
"""Seek back to the message's offset for redelivery."""
if isinstance(message, KafkaMessage) and message._msg:
tp = TopicPartition(
message._msg.topic(),
message._msg.partition(),
message._msg.offset(),
)
self._consumer.seek(tp)
def unsubscribe(self) -> None:
if self._consumer:
try:
self._consumer.unsubscribe()
except Exception:
pass
def close(self) -> None:
if self._consumer:
try:
self._consumer.close()
except Exception:
pass
self._consumer = None
class KafkaBackend:
"""Kafka pub/sub backend using one topic per logical topic."""
def __init__(self, bootstrap_servers='localhost:9092',
security_protocol='PLAINTEXT',
sasl_mechanism=None,
sasl_username=None,
sasl_password=None):
self._bootstrap_servers = bootstrap_servers
# AdminClient config
self._admin_config = {
'bootstrap.servers': bootstrap_servers,
}
if security_protocol != 'PLAINTEXT':
self._admin_config['security.protocol'] = security_protocol
if sasl_mechanism:
self._admin_config['sasl.mechanism'] = sasl_mechanism
if sasl_username:
self._admin_config['sasl.username'] = sasl_username
if sasl_password:
self._admin_config['sasl.password'] = sasl_password
logger.info(
f"Kafka backend: {bootstrap_servers} "
f"protocol={security_protocol}"
)
def _parse_topic(self, topic_id: str) -> tuple[str, str, bool]:
"""
Parse topic identifier into Kafka topic name, class, and durability.
Format: class:topicspace:topic
Returns: (topic_name, class, durable)
"""
if ':' not in topic_id:
return f'tg.flow.{topic_id}', 'flow', True
parts = topic_id.split(':', 2)
if len(parts) != 3:
raise ValueError(
f"Invalid topic format: {topic_id}, "
f"expected class:topicspace:topic"
)
cls, topicspace, topic = parts
if cls == 'flow':
durable = True
elif cls in ('request', 'response', 'notify'):
durable = False
else:
raise ValueError(
f"Invalid topic class: {cls}, "
f"expected flow, request, response, or notify"
)
topic_name = f"{topicspace}.{cls}.{topic}"
return topic_name, cls, durable
def _retention_ms(self, cls):
"""Return retention.ms for a topic class."""
if cls == 'flow':
return LONG_RETENTION_MS
return SHORT_RETENTION_MS
def create_producer(self, topic: str, schema: type,
**options) -> BackendProducer:
topic_name, cls, durable = self._parse_topic(topic)
logger.debug(f"Creating producer: topic={topic_name}")
return KafkaBackendProducer(
self._bootstrap_servers, topic_name, durable,
)
def create_consumer(self, topic: str, subscription: str, schema: type,
initial_position: str = 'latest',
**options) -> BackendConsumer:
"""Create a consumer subscribed to a Kafka topic.
Behaviour is determined by the topic's class prefix:
- flow: named consumer group, competing consumers
- request: named consumer group, competing consumers
- response: unique consumer group per instance
- notify: unique consumer group per instance
"""
topic_name, cls, durable = self._parse_topic(topic)
if cls in ('response', 'notify'):
# Per-subscriber: unique group so every instance sees
# every message. Filter by correlation ID happens at
# the Subscriber layer above.
group_id = f"{subscription}-{uuid.uuid4()}"
auto_offset_reset = 'latest'
else:
# Shared: named group, competing consumers
group_id = subscription
auto_offset_reset = (
'earliest' if initial_position == 'earliest'
else 'latest'
)
logger.debug(
f"Creating consumer: topic={topic_name}, "
f"group={group_id}, cls={cls}"
)
return KafkaBackendConsumer(
self._bootstrap_servers, topic_name, group_id,
schema, auto_offset_reset,
)
def _create_topic_sync(self, topic_name, retention_ms):
"""Blocking topic creation via AdminClient."""
admin = AdminClient(self._admin_config)
new_topic = NewTopic(
topic_name,
num_partitions=1,
replication_factor=1,
config={
'retention.ms': str(retention_ms),
},
)
fs = admin.create_topics([new_topic])
for name, f in fs.items():
try:
f.result()
logger.info(f"Created topic: {name}")
except KafkaException as e:
# Topic already exists — idempotent
if e.args[0].code() == KafkaError.TOPIC_ALREADY_EXISTS:
logger.debug(f"Topic already exists: {name}")
else:
raise
async def create_topic(self, topic: str) -> None:
"""Create a Kafka topic with appropriate retention."""
topic_name, cls, durable = self._parse_topic(topic)
retention_ms = self._retention_ms(cls)
await asyncio.to_thread(
self._create_topic_sync, topic_name, retention_ms,
)
def _delete_topic_sync(self, topic_name):
"""Blocking topic deletion via AdminClient."""
admin = AdminClient(self._admin_config)
fs = admin.delete_topics([topic_name])
for name, f in fs.items():
try:
f.result()
logger.info(f"Deleted topic: {name}")
except KafkaException as e:
# Topic doesn't exist — idempotent
if e.args[0].code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
logger.debug(f"Topic not found: {name}")
else:
raise
except Exception as e:
logger.debug(f"Topic delete for {name}: {e}")
async def delete_topic(self, topic: str) -> None:
"""Delete a Kafka topic."""
topic_name, cls, durable = self._parse_topic(topic)
await asyncio.to_thread(self._delete_topic_sync, topic_name)
def _topic_exists_sync(self, topic_name):
"""Blocking topic existence check via AdminClient."""
admin = AdminClient(self._admin_config)
metadata = admin.list_topics(timeout=10)
return topic_name in metadata.topics
async def topic_exists(self, topic: str) -> bool:
"""Check whether a Kafka topic exists."""
topic_name, cls, durable = self._parse_topic(topic)
return await asyncio.to_thread(
self._topic_exists_sync, topic_name,
)
async def ensure_topic(self, topic: str) -> None:
"""Ensure a topic exists, creating it if necessary."""
if not await self.topic_exists(topic):
await self.create_topic(topic)
def close(self) -> None:
pass

View file

@ -17,6 +17,12 @@ DEFAULT_RABBITMQ_USERNAME = os.getenv("RABBITMQ_USERNAME", 'guest')
DEFAULT_RABBITMQ_PASSWORD = os.getenv("RABBITMQ_PASSWORD", 'guest') DEFAULT_RABBITMQ_PASSWORD = os.getenv("RABBITMQ_PASSWORD", 'guest')
DEFAULT_RABBITMQ_VHOST = os.getenv("RABBITMQ_VHOST", '/') DEFAULT_RABBITMQ_VHOST = os.getenv("RABBITMQ_VHOST", '/')
DEFAULT_KAFKA_BOOTSTRAP = os.getenv("KAFKA_BOOTSTRAP_SERVERS", 'kafka:9092')
DEFAULT_KAFKA_PROTOCOL = os.getenv("KAFKA_SECURITY_PROTOCOL", 'PLAINTEXT')
DEFAULT_KAFKA_SASL_MECHANISM = os.getenv("KAFKA_SASL_MECHANISM", None)
DEFAULT_KAFKA_SASL_USERNAME = os.getenv("KAFKA_SASL_USERNAME", None)
DEFAULT_KAFKA_SASL_PASSWORD = os.getenv("KAFKA_SASL_PASSWORD", None)
def get_pubsub(**config: Any) -> Any: def get_pubsub(**config: Any) -> Any:
""" """
@ -47,6 +53,25 @@ def get_pubsub(**config: Any) -> Any:
password=config.get('rabbitmq_password', DEFAULT_RABBITMQ_PASSWORD), password=config.get('rabbitmq_password', DEFAULT_RABBITMQ_PASSWORD),
vhost=config.get('rabbitmq_vhost', DEFAULT_RABBITMQ_VHOST), vhost=config.get('rabbitmq_vhost', DEFAULT_RABBITMQ_VHOST),
) )
elif backend_type == 'kafka':
from .kafka_backend import KafkaBackend
return KafkaBackend(
bootstrap_servers=config.get(
'kafka_bootstrap_servers', DEFAULT_KAFKA_BOOTSTRAP,
),
security_protocol=config.get(
'kafka_security_protocol', DEFAULT_KAFKA_PROTOCOL,
),
sasl_mechanism=config.get(
'kafka_sasl_mechanism', DEFAULT_KAFKA_SASL_MECHANISM,
),
sasl_username=config.get(
'kafka_sasl_username', DEFAULT_KAFKA_SASL_USERNAME,
),
sasl_password=config.get(
'kafka_sasl_password', DEFAULT_KAFKA_SASL_PASSWORD,
),
)
else: else:
raise ValueError(f"Unknown pub/sub backend: {backend_type}") raise ValueError(f"Unknown pub/sub backend: {backend_type}")
@ -65,6 +90,7 @@ def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None:
pulsar_host = STANDALONE_PULSAR_HOST if standalone else DEFAULT_PULSAR_HOST pulsar_host = STANDALONE_PULSAR_HOST if standalone else DEFAULT_PULSAR_HOST
pulsar_listener = 'localhost' if standalone else None pulsar_listener = 'localhost' if standalone else None
rabbitmq_host = 'localhost' if standalone else DEFAULT_RABBITMQ_HOST rabbitmq_host = 'localhost' if standalone else DEFAULT_RABBITMQ_HOST
kafka_bootstrap = 'localhost:9092' if standalone else DEFAULT_KAFKA_BOOTSTRAP
parser.add_argument( parser.add_argument(
'--pubsub-backend', '--pubsub-backend',
@ -122,3 +148,34 @@ def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None:
default=DEFAULT_RABBITMQ_VHOST, default=DEFAULT_RABBITMQ_VHOST,
help=f'RabbitMQ vhost (default: {DEFAULT_RABBITMQ_VHOST})', help=f'RabbitMQ vhost (default: {DEFAULT_RABBITMQ_VHOST})',
) )
# Kafka options
parser.add_argument(
'--kafka-bootstrap-servers',
default=kafka_bootstrap,
help=f'Kafka bootstrap servers (default: {kafka_bootstrap})',
)
parser.add_argument(
'--kafka-security-protocol',
default=DEFAULT_KAFKA_PROTOCOL,
help=f'Kafka security protocol (default: {DEFAULT_KAFKA_PROTOCOL})',
)
parser.add_argument(
'--kafka-sasl-mechanism',
default=DEFAULT_KAFKA_SASL_MECHANISM,
help='Kafka SASL mechanism',
)
parser.add_argument(
'--kafka-sasl-username',
default=DEFAULT_KAFKA_SASL_USERNAME,
help='Kafka SASL username',
)
parser.add_argument(
'--kafka-sasl-password',
default=DEFAULT_KAFKA_SASL_PASSWORD,
help='Kafka SASL password',
)

View file

@ -266,22 +266,22 @@ class PulsarBackend:
return PulsarBackendConsumer(pulsar_consumer, schema) return PulsarBackendConsumer(pulsar_consumer, schema)
async def create_queue(self, topic: str, subscription: str) -> None: async def create_topic(self, topic: str) -> None:
"""No-op — Pulsar auto-creates topics on first use. """No-op — Pulsar auto-creates topics on first use.
TODO: Use admin REST API for explicit persistent topic creation.""" TODO: Use admin REST API for explicit persistent topic creation."""
pass pass
async def delete_queue(self, topic: str, subscription: str) -> None: async def delete_topic(self, topic: str) -> None:
"""No-op — to be replaced with admin REST API calls. """No-op — to be replaced with admin REST API calls.
TODO: Delete subscription and persistent topic via admin API.""" TODO: Delete persistent topic via admin API."""
pass pass
async def queue_exists(self, topic: str, subscription: str) -> bool: async def topic_exists(self, topic: str) -> bool:
"""Returns True — Pulsar auto-creates on subscribe. """Returns True — Pulsar auto-creates on subscribe.
TODO: Use admin REST API for actual existence check.""" TODO: Use admin REST API for actual existence check."""
return True return True
async def ensure_queue(self, topic: str, subscription: str) -> None: async def ensure_topic(self, topic: str) -> None:
"""No-op — Pulsar auto-creates topics on first use. """No-op — Pulsar auto-creates topics on first use.
TODO: Use admin REST API for explicit creation.""" TODO: Use admin REST API for explicit creation."""
pass pass

View file

@ -1,22 +1,24 @@
""" """
RabbitMQ backend implementation for pub/sub abstraction. RabbitMQ backend implementation for pub/sub abstraction.
Uses a single topic exchange per topicspace. The logical queue name Each logical topic maps to its own fanout exchange. The exchange name
becomes the routing key. Consumer behavior is determined by the encodes the full topic identity:
subscription name:
- Same subscription + same topic = shared queue (competing consumers) class:topicspace:topic exchange topicspace.class.topic
- Different subscriptions = separate queues (broadcast / fan-out)
This mirrors Pulsar's subscription model using idiomatic RabbitMQ. Producers publish to the exchange with an empty routing key.
Consumers declare and bind their own queues:
- flow / request: named durable/non-durable queue (competing consumers)
- response / notify: anonymous exclusive auto-delete queue (per-subscriber)
The flow service manages topic lifecycle (create/delete exchanges).
Consumers manage their own queue lifecycle (declare + bind on connect).
Architecture: Architecture:
Producer --> [tg exchange] --routing key--> [named queue] --> Consumer Producer --> [fanout exchange] --> [named queue] --> Consumer
--routing key--> [named queue] --> Consumer --> [named queue] --> Consumer
--routing key--> [exclusive q] --> Subscriber --> [exclusive queue] --> Subscriber
Uses basic_consume (push) instead of basic_get (polling) for
efficient message delivery.
""" """
import asyncio import asyncio
@ -58,18 +60,16 @@ class RabbitMQMessage:
class RabbitMQBackendProducer: class RabbitMQBackendProducer:
"""Publishes messages to a topic exchange with a routing key. """Publishes messages to a fanout exchange.
Uses thread-local connections so each thread gets its own Uses thread-local connections so each thread gets its own
connection/channel. This avoids wire corruption from concurrent connection/channel. This avoids wire corruption from concurrent
threads writing to the same socket (pika is not thread-safe). threads writing to the same socket (pika is not thread-safe).
""" """
def __init__(self, connection_params, exchange_name, routing_key, def __init__(self, connection_params, exchange_name, durable):
durable):
self._connection_params = connection_params self._connection_params = connection_params
self._exchange_name = exchange_name self._exchange_name = exchange_name
self._routing_key = routing_key
self._durable = durable self._durable = durable
self._local = threading.local() self._local = threading.local()
@ -90,7 +90,7 @@ class RabbitMQBackendProducer:
chan = conn.channel() chan = conn.channel()
chan.exchange_declare( chan.exchange_declare(
exchange=self._exchange_name, exchange=self._exchange_name,
exchange_type='topic', exchange_type='fanout',
durable=True, durable=True,
) )
self._local.connection = conn self._local.connection = conn
@ -113,7 +113,7 @@ class RabbitMQBackendProducer:
channel = self._get_channel() channel = self._get_channel()
channel.basic_publish( channel.basic_publish(
exchange=self._exchange_name, exchange=self._exchange_name,
routing_key=self._routing_key, routing_key='',
body=json_data.encode('utf-8'), body=json_data.encode('utf-8'),
properties=amqp_properties, properties=amqp_properties,
) )
@ -144,19 +144,17 @@ class RabbitMQBackendProducer:
class RabbitMQBackendConsumer: class RabbitMQBackendConsumer:
"""Consumes from a queue bound to a topic exchange. """Consumes from a queue bound to a fanout exchange.
Uses basic_consume (push model) with messages delivered to an Uses basic_consume (push model) with messages delivered to an
internal thread-safe queue. process_data_events() drives both internal thread-safe queue. process_data_events() drives both
message delivery and heartbeat processing. message delivery and heartbeat processing.
""" """
def __init__(self, connection_params, exchange_name, routing_key, def __init__(self, connection_params, exchange_name, queue_name,
queue_name, schema_cls, durable, exclusive=False, schema_cls, durable, exclusive=False, auto_delete=False):
auto_delete=False):
self._connection_params = connection_params self._connection_params = connection_params
self._exchange_name = exchange_name self._exchange_name = exchange_name
self._routing_key = routing_key
self._queue_name = queue_name self._queue_name = queue_name
self._schema_cls = schema_cls self._schema_cls = schema_cls
self._durable = durable self._durable = durable
@ -171,17 +169,16 @@ class RabbitMQBackendConsumer:
self._connection = pika.BlockingConnection(self._connection_params) self._connection = pika.BlockingConnection(self._connection_params)
self._channel = self._connection.channel() self._channel = self._connection.channel()
# Declare the topic exchange (idempotent, also done by producers) # Declare the fanout exchange (idempotent)
self._channel.exchange_declare( self._channel.exchange_declare(
exchange=self._exchange_name, exchange=self._exchange_name,
exchange_type='topic', exchange_type='fanout',
durable=True, durable=True,
) )
if self._exclusive: if self._exclusive:
# Anonymous ephemeral queue (response/notify class). # Anonymous ephemeral queue (response/notify class).
# These are per-consumer and must be created here — the # Per-consumer, broker assigns the name.
# broker assigns the name.
result = self._channel.queue_declare( result = self._channel.queue_declare(
queue='', queue='',
durable=False, durable=False,
@ -189,18 +186,20 @@ class RabbitMQBackendConsumer:
auto_delete=True, auto_delete=True,
) )
self._queue_name = result.method.queue self._queue_name = result.method.queue
else:
# Named queue (flow/request class).
# Consumer owns its queue — declare and bind here.
self._channel.queue_declare(
queue=self._queue_name,
durable=self._durable,
exclusive=False,
auto_delete=False,
)
# Bind queue to the fanout exchange
self._channel.queue_bind( self._channel.queue_bind(
queue=self._queue_name, queue=self._queue_name,
exchange=self._exchange_name, exchange=self._exchange_name,
routing_key=self._routing_key,
)
else:
# Named queue (flow/request class). Queue must already
# exist — created by the flow service or ensure_queue.
# We just verify it exists and bind to consume.
self._channel.queue_declare(
queue=self._queue_name, passive=True,
) )
self._channel.basic_qos(prefetch_count=1) self._channel.basic_qos(prefetch_count=1)
@ -318,7 +317,7 @@ class RabbitMQBackendConsumer:
class RabbitMQBackend: class RabbitMQBackend:
"""RabbitMQ pub/sub backend using a topic exchange per topicspace.""" """RabbitMQ pub/sub backend using one fanout exchange per topic."""
def __init__(self, host='localhost', port=5672, username='guest', def __init__(self, host='localhost', port=5672, username='guest',
password='guest', vhost='/'): password='guest', vhost='/'):
@ -331,20 +330,23 @@ class RabbitMQBackend:
) )
logger.info(f"RabbitMQ backend: {host}:{port} vhost={vhost}") logger.info(f"RabbitMQ backend: {host}:{port} vhost={vhost}")
def _parse_queue_id(self, queue_id: str) -> tuple[str, str, str, bool]: def _parse_topic(self, topic_id: str) -> tuple[str, str, bool]:
""" """
Parse queue identifier into exchange, routing key, and durability. Parse topic identifier into exchange name and durability.
Format: class:topicspace:topic Format: class:topicspace:topic
Returns: (exchange_name, routing_key, class, durable) Returns: (exchange_name, class, durable)
"""
if ':' not in queue_id:
return 'tg', queue_id, 'flow', False
parts = queue_id.split(':', 2) The exchange name encodes the full topic identity:
class:topicspace:topic topicspace.class.topic
"""
if ':' not in topic_id:
return f'tg.flow.{topic_id}', 'flow', True
parts = topic_id.split(':', 2)
if len(parts) != 3: if len(parts) != 3:
raise ValueError( raise ValueError(
f"Invalid queue format: {queue_id}, " f"Invalid topic format: {topic_id}, "
f"expected class:topicspace:topic" f"expected class:topicspace:topic"
) )
@ -356,36 +358,28 @@ class RabbitMQBackend:
durable = False durable = False
else: else:
raise ValueError( raise ValueError(
f"Invalid queue class: {cls}, " f"Invalid topic class: {cls}, "
f"expected flow, request, response, or notify" f"expected flow, request, response, or notify"
) )
# Exchange per topicspace, routing key includes class exchange_name = f"{topicspace}.{cls}.{topic}"
exchange_name = topicspace
routing_key = f"{cls}.{topic}"
return exchange_name, routing_key, cls, durable return exchange_name, cls, durable
# Keep map_queue_name for backward compatibility with tests
def map_queue_name(self, queue_id: str) -> tuple[str, bool]:
exchange, routing_key, cls, durable = self._parse_queue_id(queue_id)
return f"{exchange}.{routing_key}", durable
def create_producer(self, topic: str, schema: type, def create_producer(self, topic: str, schema: type,
**options) -> BackendProducer: **options) -> BackendProducer:
exchange, routing_key, cls, durable = self._parse_queue_id(topic) exchange, cls, durable = self._parse_topic(topic)
logger.debug( logger.debug(
f"Creating producer: exchange={exchange}, " f"Creating producer: exchange={exchange}"
f"routing_key={routing_key}"
) )
return RabbitMQBackendProducer( return RabbitMQBackendProducer(
self._connection_params, exchange, routing_key, durable, self._connection_params, exchange, durable,
) )
def create_consumer(self, topic: str, subscription: str, schema: type, def create_consumer(self, topic: str, subscription: str, schema: type,
initial_position: str = 'latest', initial_position: str = 'latest',
**options) -> BackendConsumer: **options) -> BackendConsumer:
"""Create a consumer with a queue bound to the topic exchange. """Create a consumer with a queue bound to the topic's exchange.
Behaviour is determined by the topic's class prefix: Behaviour is determined by the topic's class prefix:
- flow: named durable queue, competing consumers (round-robin) - flow: named durable queue, competing consumers (round-robin)
@ -393,7 +387,7 @@ class RabbitMQBackend:
- response: anonymous ephemeral queue, per-subscriber (auto-delete) - response: anonymous ephemeral queue, per-subscriber (auto-delete)
- notify: anonymous ephemeral queue, per-subscriber (auto-delete) - notify: anonymous ephemeral queue, per-subscriber (auto-delete)
""" """
exchange, routing_key, cls, durable = self._parse_queue_id(topic) exchange, cls, durable = self._parse_topic(topic)
if cls in ('response', 'notify'): if cls in ('response', 'notify'):
# Per-subscriber: anonymous queue, auto-deleted on disconnect # Per-subscriber: anonymous queue, auto-deleted on disconnect
@ -403,45 +397,33 @@ class RabbitMQBackend:
auto_delete = True auto_delete = True
else: else:
# Shared: named queue, competing consumers # Shared: named queue, competing consumers
queue_name = f"{exchange}.{routing_key}.{subscription}" queue_name = f"{exchange}.{subscription}"
queue_durable = durable queue_durable = durable
exclusive = False exclusive = False
auto_delete = False auto_delete = False
logger.debug( logger.debug(
f"Creating consumer: exchange={exchange}, " f"Creating consumer: exchange={exchange}, "
f"routing_key={routing_key}, queue={queue_name or '(anonymous)'}, " f"queue={queue_name or '(anonymous)'}, cls={cls}"
f"cls={cls}"
) )
return RabbitMQBackendConsumer( return RabbitMQBackendConsumer(
self._connection_params, exchange, routing_key, self._connection_params, exchange,
queue_name, schema, queue_durable, exclusive, auto_delete, queue_name, schema, queue_durable, exclusive, auto_delete,
) )
def _create_queue_sync(self, exchange, routing_key, queue_name, durable): def _create_topic_sync(self, exchange_name):
"""Blocking queue creation — run via asyncio.to_thread.""" """Blocking exchange creation — run via asyncio.to_thread."""
connection = None connection = None
try: try:
connection = pika.BlockingConnection(self._connection_params) connection = pika.BlockingConnection(self._connection_params)
channel = connection.channel() channel = connection.channel()
channel.exchange_declare( channel.exchange_declare(
exchange=exchange, exchange=exchange_name,
exchange_type='topic', exchange_type='fanout',
durable=True, durable=True,
) )
channel.queue_declare( logger.info(f"Created topic (exchange): {exchange_name}")
queue=queue_name,
durable=durable,
exclusive=False,
auto_delete=False,
)
channel.queue_bind(
queue=queue_name,
exchange=exchange,
routing_key=routing_key,
)
logger.info(f"Created queue: {queue_name}")
finally: finally:
if connection and connection.is_open: if connection and connection.is_open:
try: try:
@ -449,34 +431,30 @@ class RabbitMQBackend:
except Exception: except Exception:
pass pass
async def create_queue(self, topic: str, subscription: str) -> None: async def create_topic(self, topic: str) -> None:
"""Pre-create a named queue bound to the topic exchange. """Create the fanout exchange for a logical topic.
Only applies to shared queues (flow/request class). Response and Only applies to flow and request class topics. Response and
notify queues are anonymous/auto-delete and created by consumers. notify exchanges are created on demand by consumers.
""" """
exchange, routing_key, cls, durable = self._parse_queue_id(topic) exchange, cls, durable = self._parse_topic(topic)
if cls in ('response', 'notify'): if cls in ('response', 'notify'):
return return
queue_name = f"{exchange}.{routing_key}.{subscription}" await asyncio.to_thread(self._create_topic_sync, exchange)
await asyncio.to_thread(
self._create_queue_sync, exchange, routing_key,
queue_name, durable,
)
def _delete_queue_sync(self, queue_name): def _delete_topic_sync(self, exchange_name):
"""Blocking queue deletion — run via asyncio.to_thread.""" """Blocking exchange deletion — run via asyncio.to_thread."""
connection = None connection = None
try: try:
connection = pika.BlockingConnection(self._connection_params) connection = pika.BlockingConnection(self._connection_params)
channel = connection.channel() channel = connection.channel()
channel.queue_delete(queue=queue_name) channel.exchange_delete(exchange=exchange_name)
logger.info(f"Deleted queue: {queue_name}") logger.info(f"Deleted topic (exchange): {exchange_name}")
except Exception as e: except Exception as e:
# Idempotent — queue may already be gone # Idempotent — exchange may already be gone
logger.debug(f"Queue delete for {queue_name}: {e}") logger.debug(f"Exchange delete for {exchange_name}: {e}")
finally: finally:
if connection and connection.is_open: if connection and connection.is_open:
try: try:
@ -484,31 +462,27 @@ class RabbitMQBackend:
except Exception: except Exception:
pass pass
async def delete_queue(self, topic: str, subscription: str) -> None: async def delete_topic(self, topic: str) -> None:
"""Delete a named queue and any messages it contains. """Delete a topic's fanout exchange.
Only applies to shared queues (flow/request class). Response and Consumer queues lose their binding and drain naturally.
notify queues are anonymous/auto-delete and managed by the broker.
""" """
exchange, routing_key, cls, durable = self._parse_queue_id(topic) exchange, cls, durable = self._parse_topic(topic)
await asyncio.to_thread(self._delete_topic_sync, exchange)
if cls in ('response', 'notify'): def _topic_exists_sync(self, exchange_name):
return """Blocking exchange existence check — run via asyncio.to_thread.
queue_name = f"{exchange}.{routing_key}.{subscription}"
await asyncio.to_thread(self._delete_queue_sync, queue_name)
def _queue_exists_sync(self, queue_name):
"""Blocking queue existence check — run via asyncio.to_thread.
Uses passive=True which checks without creating.""" Uses passive=True which checks without creating."""
connection = None connection = None
try: try:
connection = pika.BlockingConnection(self._connection_params) connection = pika.BlockingConnection(self._connection_params)
channel = connection.channel() channel = connection.channel()
channel.queue_declare(queue=queue_name, passive=True) channel.exchange_declare(
exchange=exchange_name, passive=True,
)
return True return True
except pika.exceptions.ChannelClosedByBroker: except pika.exceptions.ChannelClosedByBroker:
# 404 NOT_FOUND — queue does not exist # 404 NOT_FOUND — exchange does not exist
return False return False
finally: finally:
if connection and connection.is_open: if connection and connection.is_open:
@ -517,26 +491,25 @@ class RabbitMQBackend:
except Exception: except Exception:
pass pass
async def queue_exists(self, topic: str, subscription: str) -> bool: async def topic_exists(self, topic: str) -> bool:
"""Check whether a named queue exists. """Check whether a topic's exchange exists.
Only applies to shared queues (flow/request class). Response and Only applies to flow and request class topics. Response and
notify queues are anonymous/ephemeral always returns False. notify topics are ephemeral always returns False.
""" """
exchange, routing_key, cls, durable = self._parse_queue_id(topic) exchange, cls, durable = self._parse_topic(topic)
if cls in ('response', 'notify'): if cls in ('response', 'notify'):
return False return False
queue_name = f"{exchange}.{routing_key}.{subscription}"
return await asyncio.to_thread( return await asyncio.to_thread(
self._queue_exists_sync, queue_name self._topic_exists_sync, exchange
) )
async def ensure_queue(self, topic: str, subscription: str) -> None: async def ensure_topic(self, topic: str) -> None:
"""Ensure a queue exists, creating it if necessary.""" """Ensure a topic exists, creating it if necessary."""
if not await self.queue_exists(topic, subscription): if not await self.topic_exists(topic):
await self.create_queue(topic, subscription) await self.create_topic(topic)
def close(self) -> None: def close(self) -> None:
pass pass

View file

@ -10,6 +10,8 @@ from prometheus_client import Histogram
from ... schema import TextDocument, Chunk, Metadata, Triples from ... schema import TextDocument, Chunk, Metadata, Triples
from ... base import ChunkingService, ConsumerSpec, ProducerSpec from ... base import ChunkingService, ConsumerSpec, ProducerSpec
RecursiveCharacterTextSplitter = None
from ... provenance import ( from ... provenance import (
chunk_uri as make_chunk_uri, derived_entity_triples, chunk_uri as make_chunk_uri, derived_entity_triples,
set_graph, GRAPH_SOURCE, set_graph, GRAPH_SOURCE,
@ -41,8 +43,12 @@ class Processor(ChunkingService):
self.default_chunk_size = chunk_size self.default_chunk_size = chunk_size
self.default_chunk_overlap = chunk_overlap self.default_chunk_overlap = chunk_overlap
from langchain_text_splitters import RecursiveCharacterTextSplitter global RecursiveCharacterTextSplitter
self.RecursiveCharacterTextSplitter = RecursiveCharacterTextSplitter if RecursiveCharacterTextSplitter is None:
from langchain_text_splitters import (
RecursiveCharacterTextSplitter as _cls,
)
RecursiveCharacterTextSplitter = _cls
if not hasattr(__class__, "chunk_metric"): if not hasattr(__class__, "chunk_metric"):
__class__.chunk_metric = Histogram( __class__.chunk_metric = Histogram(

View file

@ -10,6 +10,8 @@ from prometheus_client import Histogram
from ... schema import TextDocument, Chunk, Metadata, Triples from ... schema import TextDocument, Chunk, Metadata, Triples
from ... base import ChunkingService, ConsumerSpec, ProducerSpec from ... base import ChunkingService, ConsumerSpec, ProducerSpec
TokenTextSplitter = None
from ... provenance import ( from ... provenance import (
chunk_uri as make_chunk_uri, derived_entity_triples, chunk_uri as make_chunk_uri, derived_entity_triples,
set_graph, GRAPH_SOURCE, set_graph, GRAPH_SOURCE,
@ -41,8 +43,10 @@ class Processor(ChunkingService):
self.default_chunk_size = chunk_size self.default_chunk_size = chunk_size
self.default_chunk_overlap = chunk_overlap self.default_chunk_overlap = chunk_overlap
from langchain_text_splitters import TokenTextSplitter global TokenTextSplitter
self.TokenTextSplitter = TokenTextSplitter if TokenTextSplitter is None:
from langchain_text_splitters import TokenTextSplitter as _cls
TokenTextSplitter = _cls
if not hasattr(__class__, "chunk_metric"): if not hasattr(__class__, "chunk_metric"):
__class__.chunk_metric = Histogram( __class__.chunk_metric = Histogram(

View file

@ -124,9 +124,7 @@ class Processor(AsyncProcessor):
async def start(self): async def start(self):
await self.pubsub.ensure_queue( await self.pubsub.ensure_topic(self.config_request_topic)
self.config_request_topic, self.config_request_subscriber
)
await self.push() # Startup poke: empty types = everything await self.push() # Startup poke: empty types = everything
await self.config_request_consumer.start() await self.config_request_consumer.start()

View file

@ -119,9 +119,7 @@ class Processor(AsyncProcessor):
async def start(self): async def start(self):
await self.pubsub.ensure_queue( await self.pubsub.ensure_topic(self.knowledge_request_topic)
self.knowledge_request_topic, self.knowledge_request_subscriber
)
await super(Processor, self).start() await super(Processor, self).start()
await self.knowledge_request_consumer.start() await self.knowledge_request_consumer.start()
await self.knowledge_response_producer.start() await self.knowledge_response_producer.start()

View file

@ -15,6 +15,9 @@ from ... schema import Document, TextDocument, Metadata
from ... schema import librarian_request_queue, librarian_response_queue from ... schema import librarian_request_queue, librarian_response_queue
from ... schema import Triples from ... schema import Triples
from ... base import FlowProcessor, ConsumerSpec, ProducerSpec, LibrarianClient from ... base import FlowProcessor, ConsumerSpec, ProducerSpec, LibrarianClient
PyPDFLoader = None
from ... provenance import ( from ... provenance import (
document_uri, page_uri as make_page_uri, derived_entity_triples, document_uri, page_uri as make_page_uri, derived_entity_triples,
set_graph, GRAPH_SOURCE, set_graph, GRAPH_SOURCE,
@ -128,7 +131,12 @@ class Processor(FlowProcessor):
fp.write(base64.b64decode(v.data)) fp.write(base64.b64decode(v.data))
fp.close() fp.close()
from langchain_community.document_loaders import PyPDFLoader global PyPDFLoader
if PyPDFLoader is None:
from langchain_community.document_loaders import (
PyPDFLoader as _cls,
)
PyPDFLoader = _cls
loader = PyPDFLoader(temp_path) loader = PyPDFLoader(temp_path)
pages = loader.load() pages = loader.load()

View file

@ -7,7 +7,7 @@ import logging
# Module logger # Module logger
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Queue deletion retry settings # Topic deletion retry settings
DELETE_RETRIES = 5 DELETE_RETRIES = 5
DELETE_RETRY_DELAY = 2 # seconds DELETE_RETRY_DELAY = 2 # seconds
@ -215,11 +215,11 @@ class FlowConfig:
return result return result
# Pre-create flow-level queues so the data path is wired # Pre-create topic exchanges so the data path is wired
# before processors receive their config and start connecting. # before processors receive their config and start connecting.
queues = self._collect_flow_queues(cls, repl_template_with_params) topics = self._collect_flow_topics(cls, repl_template_with_params)
for topic, subscription in queues: for topic in topics:
await self.pubsub.create_queue(topic, subscription) await self.pubsub.create_topic(topic)
# Build all processor config updates, then write in a single batch. # Build all processor config updates, then write in a single batch.
updates = [] updates = []
@ -283,8 +283,8 @@ class FlowConfig:
error = None, error = None,
) )
async def ensure_existing_flow_queues(self): async def ensure_existing_flow_topics(self):
"""Ensure queues exist for all already-running flows. """Ensure topics exist for all already-running flows.
Called on startup to handle flows that were started before this Called on startup to handle flows that were started before this
version of the flow service was deployed, or before a restart. version of the flow service was deployed, or before a restart.
@ -315,7 +315,7 @@ class FlowConfig:
if blueprint_data is None: if blueprint_data is None:
logger.warning( logger.warning(
f"Blueprint '{blueprint_name}' not found for " f"Blueprint '{blueprint_name}' not found for "
f"flow '{flow_id}', skipping queue creation" f"flow '{flow_id}', skipping topic creation"
) )
continue continue
@ -333,65 +333,63 @@ class FlowConfig:
) )
return result return result
queues = self._collect_flow_queues(cls, repl_template) topics = self._collect_flow_topics(cls, repl_template)
for topic, subscription in queues: for topic in topics:
await self.pubsub.ensure_queue(topic, subscription) await self.pubsub.ensure_topic(topic)
logger.info( logger.info(
f"Ensured queues for existing flow '{flow_id}'" f"Ensured topics for existing flow '{flow_id}'"
) )
except Exception as e: except Exception as e:
logger.error( logger.error(
f"Failed to ensure queues for flow '{flow_id}': {e}" f"Failed to ensure topics for flow '{flow_id}': {e}"
) )
def _collect_flow_queues(self, cls, repl_template): def _collect_flow_topics(self, cls, repl_template):
"""Collect (topic, subscription) pairs for all flow-level queues. """Collect unique topic identifiers from the blueprint.
Iterates the blueprint's "flow" section and reads only the Iterates the blueprint's "flow" section and returns a
"topics" dict from each processor entry. deduplicated set of resolved topic strings. The flow service
manages topic lifecycle (create/delete exchanges), not
individual consumer queues.
""" """
queues = [] topics = set()
for k, v in cls["flow"].items(): for k, v in cls["flow"].items():
processor, variant = k.split(":", 1)
variant = repl_template(variant)
for spec_name, topic_template in v.get("topics", {}).items(): for spec_name, topic_template in v.get("topics", {}).items():
topic = repl_template(topic_template) topic = repl_template(topic_template)
subscription = f"{processor}--{variant}--{spec_name}" topics.add(topic)
queues.append((topic, subscription))
return queues return topics
async def _delete_queues(self, queues): async def _delete_topics(self, topics):
"""Delete queues with retries. Best-effort — logs failures but """Delete topics with retries. Best-effort — logs failures but
does not raise.""" does not raise."""
for attempt in range(DELETE_RETRIES): for attempt in range(DELETE_RETRIES):
remaining = [] remaining = []
for topic, subscription in queues: for topic in topics:
try: try:
await self.pubsub.delete_queue(topic, subscription) await self.pubsub.delete_topic(topic)
except Exception as e: except Exception as e:
logger.warning( logger.warning(
f"Queue delete failed (attempt {attempt + 1}/" f"Topic delete failed (attempt {attempt + 1}/"
f"{DELETE_RETRIES}): {topic}: {e}" f"{DELETE_RETRIES}): {topic}: {e}"
) )
remaining.append((topic, subscription)) remaining.append(topic)
if not remaining: if not remaining:
return return
queues = remaining topics = remaining
if attempt < DELETE_RETRIES - 1: if attempt < DELETE_RETRIES - 1:
await asyncio.sleep(DELETE_RETRY_DELAY) await asyncio.sleep(DELETE_RETRY_DELAY)
for topic, subscription in queues: for topic in topics:
logger.error( logger.error(
f"Failed to delete queue after {DELETE_RETRIES} " f"Failed to delete topic after {DELETE_RETRIES} "
f"attempts: {topic}" f"attempts: {topic}"
) )
@ -426,8 +424,8 @@ class FlowConfig:
result = result.replace(f"{{{param_name}}}", str(param_value)) result = result.replace(f"{{{param_name}}}", str(param_value))
return result return result
# Collect queue identifiers before removing config # Collect topic identifiers before removing config
queues = self._collect_flow_queues(cls, repl_template) topics = self._collect_flow_topics(cls, repl_template)
# Phase 1: Set status to "stopping" and remove processor config. # Phase 1: Set status to "stopping" and remove processor config.
# The config push tells processors to shut down their consumers. # The config push tells processors to shut down their consumers.
@ -448,8 +446,8 @@ class FlowConfig:
await self.config.delete_many(deletes) await self.config.delete_many(deletes)
# Phase 2: Delete queues with retries, then remove the flow record. # Phase 2: Delete topics with retries, then remove the flow record.
await self._delete_queues(queues) await self._delete_topics(topics)
if msg.flow_id in await self.config.keys("flow"): if msg.flow_id in await self.config.keys("flow"):
await self.config.delete("flow", msg.flow_id) await self.config.delete("flow", msg.flow_id)

View file

@ -101,11 +101,9 @@ class Processor(AsyncProcessor):
async def start(self): async def start(self):
await self.pubsub.ensure_queue( await self.pubsub.ensure_topic(self.flow_request_topic)
self.flow_request_topic, self.flow_request_subscriber
)
await self.config_client.start() await self.config_client.start()
await self.flow.ensure_existing_flow_queues() await self.flow.ensure_existing_flow_topics()
await self.flow_request_consumer.start() await self.flow_request_consumer.start()
async def on_flow_request(self, msg, consumer, flow): async def on_flow_request(self, msg, consumer, flow):

View file

@ -263,12 +263,8 @@ class Processor(AsyncProcessor):
async def start(self): async def start(self):
await self.pubsub.ensure_queue( await self.pubsub.ensure_topic(self.librarian_request_topic)
self.librarian_request_topic, self.librarian_request_subscriber await self.pubsub.ensure_topic(self.collection_request_topic)
)
await self.pubsub.ensure_queue(
self.collection_request_topic, self.collection_request_subscriber
)
await super(Processor, self).start() await super(Processor, self).start()
await self.librarian_request_consumer.start() await self.librarian_request_consumer.start()
await self.librarian_response_producer.start() await self.librarian_response_producer.start()

View file

@ -18,6 +18,12 @@ import logging
# Module logger # Module logger
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from google import genai
from google.genai import types
from google.genai.types import HarmCategory, HarmBlockThreshold
from google.genai.errors import ClientError
from google.api_core.exceptions import ResourceExhausted
from .... exceptions import TooManyRequests from .... exceptions import TooManyRequests
from .... base import LlmService, LlmResult, LlmChunk from .... base import LlmService, LlmResult, LlmChunk
@ -71,20 +77,20 @@ class Processor(LlmService):
block_level = self.HarmBlockThreshold.BLOCK_ONLY_HIGH block_level = self.HarmBlockThreshold.BLOCK_ONLY_HIGH
self.safety_settings = [ self.safety_settings = [
self.types.SafetySetting( types.SafetySetting(
category = self.HarmCategory.HARM_CATEGORY_HATE_SPEECH, category = HarmCategory.HARM_CATEGORY_HATE_SPEECH,
threshold = block_level, threshold = block_level,
), ),
self.types.SafetySetting( types.SafetySetting(
category = self.HarmCategory.HARM_CATEGORY_HARASSMENT, category = HarmCategory.HARM_CATEGORY_HARASSMENT,
threshold = block_level, threshold = block_level,
), ),
self.types.SafetySetting( types.SafetySetting(
category = self.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, category = HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
threshold = block_level, threshold = block_level,
), ),
self.types.SafetySetting( types.SafetySetting(
category = self.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, category = HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
threshold = block_level, threshold = block_level,
), ),
# There is a documentation conflict on whether or not # There is a documentation conflict on whether or not
@ -104,7 +110,7 @@ class Processor(LlmService):
if cache_key not in self.generation_configs: if cache_key not in self.generation_configs:
logger.info(f"Creating generation config for '{model_name}' with temperature {effective_temperature}") logger.info(f"Creating generation config for '{model_name}' with temperature {effective_temperature}")
self.generation_configs[cache_key] = self.types.GenerateContentConfig( self.generation_configs[cache_key] = types.GenerateContentConfig(
temperature = effective_temperature, temperature = effective_temperature,
top_p = 1, top_p = 1,
top_k = 40, top_k = 40,
@ -160,7 +166,7 @@ class Processor(LlmService):
# Leave rate limit retries to the default handler # Leave rate limit retries to the default handler
raise TooManyRequests() raise TooManyRequests()
except self.ClientError as e: except ClientError as e:
# google-genai SDK throws ClientError for 4xx errors # google-genai SDK throws ClientError for 4xx errors
if e.code == 429: if e.code == 429:
logger.warning(f"Rate limit exceeded (ClientError 429): {e}") logger.warning(f"Rate limit exceeded (ClientError 429): {e}")
@ -229,11 +235,11 @@ class Processor(LlmService):
logger.debug("Streaming complete") logger.debug("Streaming complete")
except self.ResourceExhausted: except ResourceExhausted:
logger.warning("Rate limit exceeded during streaming") logger.warning("Rate limit exceeded during streaming")
raise TooManyRequests() raise TooManyRequests()
except self.ClientError as e: except ClientError as e:
# google-genai SDK throws ClientError for 4xx errors # google-genai SDK throws ClientError for 4xx errors
if e.code == 429: if e.code == 429:
logger.warning(f"Rate limit exceeded during streaming (ClientError 429): {e}") logger.warning(f"Rate limit exceeded during streaming (ClientError 429): {e}")