mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-25 08:26:21 +02:00
Adds a RabbitMQ backend as an alternative to Pulsar, selectable via PUBSUB_BACKEND=rabbitmq. Both backends implement the same PubSubBackend protocol — no application code changes needed to switch. RabbitMQ topology: - Single topic exchange per topicspace (e.g. 'tg') - Routing key derived from queue class and topic name - Shared consumers: named queue bound to exchange (competing, round-robin) - Exclusive consumers: anonymous auto-delete queue (broadcast, each gets every message). Used by Subscriber and config push consumer. - Thread-local producer connections (pika is not thread-safe) - Push-based consumption via basic_consume with process_data_events for heartbeat processing Consumer model changes: - Consumer class creates one backend consumer per concurrent task (required for pika thread safety, harmless for Pulsar) - Consumer class accepts consumer_type parameter - Subscriber passes consumer_type='exclusive' for broadcast semantics - Config push consumer uses consumer_type='exclusive' so every processor instance receives config updates - handle_one_from_queue receives consumer as parameter for correct per-connection ack/nack LibrarianClient: - New shared client class replacing duplicated librarian request-response code across 6+ services (chunking, decoders, RAG, etc.) - Uses stream-document instead of get-document-content for fetching document content in 1MB chunks (avoids broker message size limits) - Standalone object (self.librarian = LibrarianClient(...)) not a mixin - get-document-content marked deprecated in schema and OpenAPI spec Serialisation: - Extracted dataclass_to_dict/dict_to_dataclass to shared serialization.py (used by both Pulsar and RabbitMQ backends) Librarian queues: - Changed from flow class (persistent) back to request/response class now that stream-document eliminates large single messages - API upload chunk size reduced from 5MB to 3MB to stay under broker limits after base64 encoding Factory and CLI: - get_pubsub() handles 'rabbitmq' backend with RabbitMQ connection params - add_pubsub_args() includes RabbitMQ options (host, port, credentials) - add_pubsub_args(standalone=True) defaults to localhost for CLI tools - init_trustgraph skips Pulsar admin setup for non-Pulsar backends - tg-dump-queues and tg-monitor-prompts use backend abstraction - BaseClient and ConfigClient accept generic pubsub config
107 lines
3.6 KiB
Python
107 lines
3.6 KiB
Python
"""
|
|
Unit tests for RabbitMQ backend — queue name mapping and factory dispatch.
|
|
Does not require a running RabbitMQ instance.
|
|
"""
|
|
|
|
import pytest
|
|
import argparse
|
|
|
|
pika = pytest.importorskip("pika", reason="pika not installed")
|
|
|
|
from trustgraph.base.rabbitmq_backend import RabbitMQBackend
|
|
from trustgraph.base.pubsub import get_pubsub, add_pubsub_args
|
|
|
|
|
|
class TestRabbitMQMapQueueName:
|
|
|
|
@pytest.fixture
|
|
def backend(self):
|
|
b = object.__new__(RabbitMQBackend)
|
|
return b
|
|
|
|
def test_flow_is_durable(self, backend):
|
|
name, durable = backend.map_queue_name('flow:tg:text-completion-request')
|
|
assert durable is True
|
|
assert name == 'tg.flow.text-completion-request'
|
|
|
|
def test_state_is_durable(self, backend):
|
|
name, durable = backend.map_queue_name('state:tg:config')
|
|
assert durable is True
|
|
assert name == 'tg.state.config'
|
|
|
|
def test_request_is_not_durable(self, backend):
|
|
name, durable = backend.map_queue_name('request:tg:config')
|
|
assert durable is False
|
|
assert name == 'tg.request.config'
|
|
|
|
def test_response_is_not_durable(self, backend):
|
|
name, durable = backend.map_queue_name('response:tg:librarian')
|
|
assert durable is False
|
|
assert name == 'tg.response.librarian'
|
|
|
|
def test_custom_topicspace(self, backend):
|
|
name, durable = backend.map_queue_name('flow:prod:my-queue')
|
|
assert name == 'prod.flow.my-queue'
|
|
assert durable is True
|
|
|
|
def test_no_colon_defaults_to_flow(self, backend):
|
|
name, durable = backend.map_queue_name('simple-queue')
|
|
assert name == 'tg.simple-queue'
|
|
assert durable is False
|
|
|
|
def test_invalid_class_raises(self, backend):
|
|
with pytest.raises(ValueError, match="Invalid queue class"):
|
|
backend.map_queue_name('unknown:tg:topic')
|
|
|
|
def test_flow_with_flow_suffix(self, backend):
|
|
"""Queue names with flow suffix (e.g. :default) are preserved."""
|
|
name, durable = backend.map_queue_name('request:tg:prompt:default')
|
|
assert name == 'tg.request.prompt:default'
|
|
|
|
|
|
class TestGetPubsubRabbitMQ:
|
|
|
|
def test_factory_creates_rabbitmq_backend(self):
|
|
backend = get_pubsub(pubsub_backend='rabbitmq')
|
|
assert isinstance(backend, RabbitMQBackend)
|
|
|
|
def test_factory_passes_config(self):
|
|
backend = get_pubsub(
|
|
pubsub_backend='rabbitmq',
|
|
rabbitmq_host='myhost',
|
|
rabbitmq_port=5673,
|
|
rabbitmq_username='user',
|
|
rabbitmq_password='pass',
|
|
rabbitmq_vhost='/test',
|
|
)
|
|
assert isinstance(backend, RabbitMQBackend)
|
|
# Verify connection params were set
|
|
params = backend._connection_params
|
|
assert params.host == 'myhost'
|
|
assert params.port == 5673
|
|
assert params.virtual_host == '/test'
|
|
|
|
|
|
class TestAddPubsubArgsRabbitMQ:
|
|
|
|
def test_rabbitmq_args_present(self):
|
|
parser = argparse.ArgumentParser()
|
|
add_pubsub_args(parser)
|
|
args = parser.parse_args([
|
|
'--pubsub-backend', 'rabbitmq',
|
|
'--rabbitmq-host', 'myhost',
|
|
'--rabbitmq-port', '5673',
|
|
])
|
|
assert args.pubsub_backend == 'rabbitmq'
|
|
assert args.rabbitmq_host == 'myhost'
|
|
assert args.rabbitmq_port == 5673
|
|
|
|
def test_rabbitmq_defaults_container(self):
|
|
parser = argparse.ArgumentParser()
|
|
add_pubsub_args(parser)
|
|
args = parser.parse_args([])
|
|
assert args.rabbitmq_host == 'rabbitmq'
|
|
assert args.rabbitmq_port == 5672
|
|
assert args.rabbitmq_username == 'guest'
|
|
assert args.rabbitmq_password == 'guest'
|
|
assert args.rabbitmq_vhost == '/'
|