mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-06-30 17:09:38 +02:00
refactor: use one fanout exchange per topic instead of shared topic exchange (#827)
The RabbitMQ backend used a single topic exchange per topicspace with routing keys to differentiate logical topics. This meant the flow service had to manually create named queues for every processor-topic pair, including producer-side topics — creating phantom queues that accumulated unread message copies indefinitely. Replace with one fanout exchange per logical topic. Consumers now declare and bind their own queues on connect. The flow service manages topic lifecycle (create/delete exchanges) rather than queue lifecycle, and only collects unique topic identifiers instead of per-processor (topic, subscription) pairs. Backend API: create_queue/delete_queue/ensure_queue replaced with create_topic/delete_topic/ensure_topic (subscription parameter removed).
This commit is contained in:
parent
391b9076f3
commit
3505bfdd25
9 changed files with 190 additions and 228 deletions
|
|
@ -1,5 +1,5 @@
|
||||||
"""
|
"""
|
||||||
Unit tests for RabbitMQ backend — queue name mapping and factory dispatch.
|
Unit tests for RabbitMQ backend — topic parsing and factory dispatch.
|
||||||
Does not require a running RabbitMQ instance.
|
Does not require a running RabbitMQ instance.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
@ -12,7 +12,7 @@ from trustgraph.base.rabbitmq_backend import RabbitMQBackend
|
||||||
from trustgraph.base.pubsub import get_pubsub, add_pubsub_args
|
from trustgraph.base.pubsub import get_pubsub, add_pubsub_args
|
||||||
|
|
||||||
|
|
||||||
class TestRabbitMQMapQueueName:
|
class TestRabbitMQParseTopic:
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def backend(self):
|
def backend(self):
|
||||||
|
|
@ -20,43 +20,48 @@ class TestRabbitMQMapQueueName:
|
||||||
return b
|
return b
|
||||||
|
|
||||||
def test_flow_is_durable(self, backend):
|
def test_flow_is_durable(self, backend):
|
||||||
name, durable = backend.map_queue_name('flow:tg:text-completion-request')
|
exchange, cls, durable = backend._parse_topic('flow:tg:text-completion-request')
|
||||||
assert durable is True
|
assert durable is True
|
||||||
assert name == 'tg.flow.text-completion-request'
|
assert cls == 'flow'
|
||||||
|
assert exchange == 'tg.flow.text-completion-request'
|
||||||
|
|
||||||
def test_notify_is_not_durable(self, backend):
|
def test_notify_is_not_durable(self, backend):
|
||||||
name, durable = backend.map_queue_name('notify:tg:config')
|
exchange, cls, durable = backend._parse_topic('notify:tg:config')
|
||||||
assert durable is False
|
assert durable is False
|
||||||
assert name == 'tg.notify.config'
|
assert cls == 'notify'
|
||||||
|
assert exchange == 'tg.notify.config'
|
||||||
|
|
||||||
def test_request_is_not_durable(self, backend):
|
def test_request_is_not_durable(self, backend):
|
||||||
name, durable = backend.map_queue_name('request:tg:config')
|
exchange, cls, durable = backend._parse_topic('request:tg:config')
|
||||||
assert durable is False
|
assert durable is False
|
||||||
assert name == 'tg.request.config'
|
assert cls == 'request'
|
||||||
|
assert exchange == 'tg.request.config'
|
||||||
|
|
||||||
def test_response_is_not_durable(self, backend):
|
def test_response_is_not_durable(self, backend):
|
||||||
name, durable = backend.map_queue_name('response:tg:librarian')
|
exchange, cls, durable = backend._parse_topic('response:tg:librarian')
|
||||||
assert durable is False
|
assert durable is False
|
||||||
assert name == 'tg.response.librarian'
|
assert cls == 'response'
|
||||||
|
assert exchange == 'tg.response.librarian'
|
||||||
|
|
||||||
def test_custom_topicspace(self, backend):
|
def test_custom_topicspace(self, backend):
|
||||||
name, durable = backend.map_queue_name('flow:prod:my-queue')
|
exchange, cls, durable = backend._parse_topic('flow:prod:my-queue')
|
||||||
assert name == 'prod.flow.my-queue'
|
assert exchange == 'prod.flow.my-queue'
|
||||||
assert durable is True
|
assert durable is True
|
||||||
|
|
||||||
def test_no_colon_defaults_to_flow(self, backend):
|
def test_no_colon_defaults_to_flow(self, backend):
|
||||||
name, durable = backend.map_queue_name('simple-queue')
|
exchange, cls, durable = backend._parse_topic('simple-queue')
|
||||||
assert name == 'tg.simple-queue'
|
assert exchange == 'tg.flow.simple-queue'
|
||||||
assert durable is False
|
assert cls == 'flow'
|
||||||
|
assert durable is True
|
||||||
|
|
||||||
def test_invalid_class_raises(self, backend):
|
def test_invalid_class_raises(self, backend):
|
||||||
with pytest.raises(ValueError, match="Invalid queue class"):
|
with pytest.raises(ValueError, match="Invalid topic class"):
|
||||||
backend.map_queue_name('unknown:tg:topic')
|
backend._parse_topic('unknown:tg:topic')
|
||||||
|
|
||||||
def test_flow_with_flow_suffix(self, backend):
|
def test_topic_with_flow_suffix(self, backend):
|
||||||
"""Queue names with flow suffix (e.g. :default) are preserved."""
|
"""Topic names with flow suffix (e.g. :default) are preserved."""
|
||||||
name, durable = backend.map_queue_name('request:tg:prompt:default')
|
exchange, cls, durable = backend._parse_topic('request:tg:prompt:default')
|
||||||
assert name == 'tg.request.prompt:default'
|
assert exchange == 'tg.request.prompt:default'
|
||||||
|
|
||||||
|
|
||||||
class TestGetPubsubRabbitMQ:
|
class TestGetPubsubRabbitMQ:
|
||||||
|
|
|
||||||
|
|
@ -121,7 +121,7 @@ class PubSubBackend(Protocol):
|
||||||
Create a producer for a topic.
|
Create a producer for a topic.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
topic: Generic topic format (qos/tenant/namespace/queue)
|
topic: Queue identifier in class:topicspace:topic format
|
||||||
schema: Dataclass type for messages
|
schema: Dataclass type for messages
|
||||||
**options: Backend-specific options (e.g., chunking_enabled)
|
**options: Backend-specific options (e.g., chunking_enabled)
|
||||||
|
|
||||||
|
|
@ -159,59 +159,55 @@ class PubSubBackend(Protocol):
|
||||||
"""
|
"""
|
||||||
...
|
...
|
||||||
|
|
||||||
async def create_queue(self, topic: str, subscription: str) -> None:
|
async def create_topic(self, topic: str) -> None:
|
||||||
"""
|
"""
|
||||||
Pre-create a queue so it exists before any consumer connects.
|
Create the broker-side resources for a logical topic.
|
||||||
|
|
||||||
The topic and subscription together identify the queue, mirroring
|
For RabbitMQ this creates a fanout exchange. For Pulsar this is
|
||||||
create_consumer where the queue name is derived from both.
|
a no-op (topics auto-create on first use).
|
||||||
|
|
||||||
Idempotent — creating an already-existing queue succeeds silently.
|
Idempotent — creating an already-existing topic succeeds silently.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
topic: Queue identifier in class:topicspace:topic format
|
topic: Topic identifier in class:topicspace:topic format
|
||||||
subscription: Subscription/consumer group name
|
|
||||||
"""
|
"""
|
||||||
...
|
...
|
||||||
|
|
||||||
async def delete_queue(self, topic: str, subscription: str) -> None:
|
async def delete_topic(self, topic: str) -> None:
|
||||||
"""
|
"""
|
||||||
Delete a queue and any messages it contains.
|
Delete a topic and discard any in-flight messages.
|
||||||
|
|
||||||
The topic and subscription together identify the queue, mirroring
|
For RabbitMQ this deletes the fanout exchange; consumer queues
|
||||||
create_consumer where the queue name is derived from both.
|
lose their binding and drain naturally.
|
||||||
|
|
||||||
Idempotent — deleting a non-existent queue succeeds silently.
|
Idempotent — deleting a non-existent topic succeeds silently.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
topic: Queue identifier in class:topicspace:topic format
|
topic: Topic identifier in class:topicspace:topic format
|
||||||
subscription: Subscription/consumer group name
|
|
||||||
"""
|
"""
|
||||||
...
|
...
|
||||||
|
|
||||||
async def queue_exists(self, topic: str, subscription: str) -> bool:
|
async def topic_exists(self, topic: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Check whether a queue exists.
|
Check whether a topic exists.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
topic: Queue identifier in class:topicspace:topic format
|
topic: Topic identifier in class:topicspace:topic format
|
||||||
subscription: Subscription/consumer group name
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True if the queue exists, False otherwise.
|
True if the topic exists, False otherwise.
|
||||||
"""
|
"""
|
||||||
...
|
...
|
||||||
|
|
||||||
async def ensure_queue(self, topic: str, subscription: str) -> None:
|
async def ensure_topic(self, topic: str) -> None:
|
||||||
"""
|
"""
|
||||||
Ensure a queue exists, creating it if necessary.
|
Ensure a topic exists, creating it if necessary.
|
||||||
|
|
||||||
Convenience wrapper — checks existence, creates if missing.
|
Convenience wrapper — checks existence, creates if missing.
|
||||||
Used by system services on startup.
|
Used by the flow service and system services on startup.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
topic: Queue identifier in class:topicspace:topic format
|
topic: Topic identifier in class:topicspace:topic format
|
||||||
subscription: Subscription/consumer group name
|
|
||||||
"""
|
"""
|
||||||
...
|
...
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -266,22 +266,22 @@ class PulsarBackend:
|
||||||
|
|
||||||
return PulsarBackendConsumer(pulsar_consumer, schema)
|
return PulsarBackendConsumer(pulsar_consumer, schema)
|
||||||
|
|
||||||
async def create_queue(self, topic: str, subscription: str) -> None:
|
async def create_topic(self, topic: str) -> None:
|
||||||
"""No-op — Pulsar auto-creates topics on first use.
|
"""No-op — Pulsar auto-creates topics on first use.
|
||||||
TODO: Use admin REST API for explicit persistent topic creation."""
|
TODO: Use admin REST API for explicit persistent topic creation."""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
async def delete_queue(self, topic: str, subscription: str) -> None:
|
async def delete_topic(self, topic: str) -> None:
|
||||||
"""No-op — to be replaced with admin REST API calls.
|
"""No-op — to be replaced with admin REST API calls.
|
||||||
TODO: Delete subscription and persistent topic via admin API."""
|
TODO: Delete persistent topic via admin API."""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
async def queue_exists(self, topic: str, subscription: str) -> bool:
|
async def topic_exists(self, topic: str) -> bool:
|
||||||
"""Returns True — Pulsar auto-creates on subscribe.
|
"""Returns True — Pulsar auto-creates on subscribe.
|
||||||
TODO: Use admin REST API for actual existence check."""
|
TODO: Use admin REST API for actual existence check."""
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def ensure_queue(self, topic: str, subscription: str) -> None:
|
async def ensure_topic(self, topic: str) -> None:
|
||||||
"""No-op — Pulsar auto-creates topics on first use.
|
"""No-op — Pulsar auto-creates topics on first use.
|
||||||
TODO: Use admin REST API for explicit creation."""
|
TODO: Use admin REST API for explicit creation."""
|
||||||
pass
|
pass
|
||||||
|
|
|
||||||
|
|
@ -1,22 +1,24 @@
|
||||||
"""
|
"""
|
||||||
RabbitMQ backend implementation for pub/sub abstraction.
|
RabbitMQ backend implementation for pub/sub abstraction.
|
||||||
|
|
||||||
Uses a single topic exchange per topicspace. The logical queue name
|
Each logical topic maps to its own fanout exchange. The exchange name
|
||||||
becomes the routing key. Consumer behavior is determined by the
|
encodes the full topic identity:
|
||||||
subscription name:
|
|
||||||
|
|
||||||
- Same subscription + same topic = shared queue (competing consumers)
|
class:topicspace:topic → exchange topicspace.class.topic
|
||||||
- Different subscriptions = separate queues (broadcast / fan-out)
|
|
||||||
|
|
||||||
This mirrors Pulsar's subscription model using idiomatic RabbitMQ.
|
Producers publish to the exchange with an empty routing key.
|
||||||
|
Consumers declare and bind their own queues:
|
||||||
|
|
||||||
|
- flow / request: named durable/non-durable queue (competing consumers)
|
||||||
|
- response / notify: anonymous exclusive auto-delete queue (per-subscriber)
|
||||||
|
|
||||||
|
The flow service manages topic lifecycle (create/delete exchanges).
|
||||||
|
Consumers manage their own queue lifecycle (declare + bind on connect).
|
||||||
|
|
||||||
Architecture:
|
Architecture:
|
||||||
Producer --> [tg exchange] --routing key--> [named queue] --> Consumer
|
Producer --> [fanout exchange] --> [named queue] --> Consumer
|
||||||
--routing key--> [named queue] --> Consumer
|
--> [named queue] --> Consumer
|
||||||
--routing key--> [exclusive q] --> Subscriber
|
--> [exclusive queue] --> Subscriber
|
||||||
|
|
||||||
Uses basic_consume (push) instead of basic_get (polling) for
|
|
||||||
efficient message delivery.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
@ -58,18 +60,16 @@ class RabbitMQMessage:
|
||||||
|
|
||||||
|
|
||||||
class RabbitMQBackendProducer:
|
class RabbitMQBackendProducer:
|
||||||
"""Publishes messages to a topic exchange with a routing key.
|
"""Publishes messages to a fanout exchange.
|
||||||
|
|
||||||
Uses thread-local connections so each thread gets its own
|
Uses thread-local connections so each thread gets its own
|
||||||
connection/channel. This avoids wire corruption from concurrent
|
connection/channel. This avoids wire corruption from concurrent
|
||||||
threads writing to the same socket (pika is not thread-safe).
|
threads writing to the same socket (pika is not thread-safe).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, connection_params, exchange_name, routing_key,
|
def __init__(self, connection_params, exchange_name, durable):
|
||||||
durable):
|
|
||||||
self._connection_params = connection_params
|
self._connection_params = connection_params
|
||||||
self._exchange_name = exchange_name
|
self._exchange_name = exchange_name
|
||||||
self._routing_key = routing_key
|
|
||||||
self._durable = durable
|
self._durable = durable
|
||||||
self._local = threading.local()
|
self._local = threading.local()
|
||||||
|
|
||||||
|
|
@ -90,7 +90,7 @@ class RabbitMQBackendProducer:
|
||||||
chan = conn.channel()
|
chan = conn.channel()
|
||||||
chan.exchange_declare(
|
chan.exchange_declare(
|
||||||
exchange=self._exchange_name,
|
exchange=self._exchange_name,
|
||||||
exchange_type='topic',
|
exchange_type='fanout',
|
||||||
durable=True,
|
durable=True,
|
||||||
)
|
)
|
||||||
self._local.connection = conn
|
self._local.connection = conn
|
||||||
|
|
@ -113,7 +113,7 @@ class RabbitMQBackendProducer:
|
||||||
channel = self._get_channel()
|
channel = self._get_channel()
|
||||||
channel.basic_publish(
|
channel.basic_publish(
|
||||||
exchange=self._exchange_name,
|
exchange=self._exchange_name,
|
||||||
routing_key=self._routing_key,
|
routing_key='',
|
||||||
body=json_data.encode('utf-8'),
|
body=json_data.encode('utf-8'),
|
||||||
properties=amqp_properties,
|
properties=amqp_properties,
|
||||||
)
|
)
|
||||||
|
|
@ -144,19 +144,17 @@ class RabbitMQBackendProducer:
|
||||||
|
|
||||||
|
|
||||||
class RabbitMQBackendConsumer:
|
class RabbitMQBackendConsumer:
|
||||||
"""Consumes from a queue bound to a topic exchange.
|
"""Consumes from a queue bound to a fanout exchange.
|
||||||
|
|
||||||
Uses basic_consume (push model) with messages delivered to an
|
Uses basic_consume (push model) with messages delivered to an
|
||||||
internal thread-safe queue. process_data_events() drives both
|
internal thread-safe queue. process_data_events() drives both
|
||||||
message delivery and heartbeat processing.
|
message delivery and heartbeat processing.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, connection_params, exchange_name, routing_key,
|
def __init__(self, connection_params, exchange_name, queue_name,
|
||||||
queue_name, schema_cls, durable, exclusive=False,
|
schema_cls, durable, exclusive=False, auto_delete=False):
|
||||||
auto_delete=False):
|
|
||||||
self._connection_params = connection_params
|
self._connection_params = connection_params
|
||||||
self._exchange_name = exchange_name
|
self._exchange_name = exchange_name
|
||||||
self._routing_key = routing_key
|
|
||||||
self._queue_name = queue_name
|
self._queue_name = queue_name
|
||||||
self._schema_cls = schema_cls
|
self._schema_cls = schema_cls
|
||||||
self._durable = durable
|
self._durable = durable
|
||||||
|
|
@ -171,17 +169,16 @@ class RabbitMQBackendConsumer:
|
||||||
self._connection = pika.BlockingConnection(self._connection_params)
|
self._connection = pika.BlockingConnection(self._connection_params)
|
||||||
self._channel = self._connection.channel()
|
self._channel = self._connection.channel()
|
||||||
|
|
||||||
# Declare the topic exchange (idempotent, also done by producers)
|
# Declare the fanout exchange (idempotent)
|
||||||
self._channel.exchange_declare(
|
self._channel.exchange_declare(
|
||||||
exchange=self._exchange_name,
|
exchange=self._exchange_name,
|
||||||
exchange_type='topic',
|
exchange_type='fanout',
|
||||||
durable=True,
|
durable=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
if self._exclusive:
|
if self._exclusive:
|
||||||
# Anonymous ephemeral queue (response/notify class).
|
# Anonymous ephemeral queue (response/notify class).
|
||||||
# These are per-consumer and must be created here — the
|
# Per-consumer, broker assigns the name.
|
||||||
# broker assigns the name.
|
|
||||||
result = self._channel.queue_declare(
|
result = self._channel.queue_declare(
|
||||||
queue='',
|
queue='',
|
||||||
durable=False,
|
durable=False,
|
||||||
|
|
@ -189,20 +186,22 @@ class RabbitMQBackendConsumer:
|
||||||
auto_delete=True,
|
auto_delete=True,
|
||||||
)
|
)
|
||||||
self._queue_name = result.method.queue
|
self._queue_name = result.method.queue
|
||||||
|
|
||||||
self._channel.queue_bind(
|
|
||||||
queue=self._queue_name,
|
|
||||||
exchange=self._exchange_name,
|
|
||||||
routing_key=self._routing_key,
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
# Named queue (flow/request class). Queue must already
|
# Named queue (flow/request class).
|
||||||
# exist — created by the flow service or ensure_queue.
|
# Consumer owns its queue — declare and bind here.
|
||||||
# We just verify it exists and bind to consume.
|
|
||||||
self._channel.queue_declare(
|
self._channel.queue_declare(
|
||||||
queue=self._queue_name, passive=True,
|
queue=self._queue_name,
|
||||||
|
durable=self._durable,
|
||||||
|
exclusive=False,
|
||||||
|
auto_delete=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Bind queue to the fanout exchange
|
||||||
|
self._channel.queue_bind(
|
||||||
|
queue=self._queue_name,
|
||||||
|
exchange=self._exchange_name,
|
||||||
|
)
|
||||||
|
|
||||||
self._channel.basic_qos(prefetch_count=1)
|
self._channel.basic_qos(prefetch_count=1)
|
||||||
|
|
||||||
# Register push-based consumer
|
# Register push-based consumer
|
||||||
|
|
@ -318,7 +317,7 @@ class RabbitMQBackendConsumer:
|
||||||
|
|
||||||
|
|
||||||
class RabbitMQBackend:
|
class RabbitMQBackend:
|
||||||
"""RabbitMQ pub/sub backend using a topic exchange per topicspace."""
|
"""RabbitMQ pub/sub backend using one fanout exchange per topic."""
|
||||||
|
|
||||||
def __init__(self, host='localhost', port=5672, username='guest',
|
def __init__(self, host='localhost', port=5672, username='guest',
|
||||||
password='guest', vhost='/'):
|
password='guest', vhost='/'):
|
||||||
|
|
@ -331,20 +330,23 @@ class RabbitMQBackend:
|
||||||
)
|
)
|
||||||
logger.info(f"RabbitMQ backend: {host}:{port} vhost={vhost}")
|
logger.info(f"RabbitMQ backend: {host}:{port} vhost={vhost}")
|
||||||
|
|
||||||
def _parse_queue_id(self, queue_id: str) -> tuple[str, str, str, bool]:
|
def _parse_topic(self, topic_id: str) -> tuple[str, str, bool]:
|
||||||
"""
|
"""
|
||||||
Parse queue identifier into exchange, routing key, and durability.
|
Parse topic identifier into exchange name and durability.
|
||||||
|
|
||||||
Format: class:topicspace:topic
|
Format: class:topicspace:topic
|
||||||
Returns: (exchange_name, routing_key, class, durable)
|
Returns: (exchange_name, class, durable)
|
||||||
"""
|
|
||||||
if ':' not in queue_id:
|
|
||||||
return 'tg', queue_id, 'flow', False
|
|
||||||
|
|
||||||
parts = queue_id.split(':', 2)
|
The exchange name encodes the full topic identity:
|
||||||
|
class:topicspace:topic → topicspace.class.topic
|
||||||
|
"""
|
||||||
|
if ':' not in topic_id:
|
||||||
|
return f'tg.flow.{topic_id}', 'flow', True
|
||||||
|
|
||||||
|
parts = topic_id.split(':', 2)
|
||||||
if len(parts) != 3:
|
if len(parts) != 3:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Invalid queue format: {queue_id}, "
|
f"Invalid topic format: {topic_id}, "
|
||||||
f"expected class:topicspace:topic"
|
f"expected class:topicspace:topic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -356,36 +358,28 @@ class RabbitMQBackend:
|
||||||
durable = False
|
durable = False
|
||||||
else:
|
else:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Invalid queue class: {cls}, "
|
f"Invalid topic class: {cls}, "
|
||||||
f"expected flow, request, response, or notify"
|
f"expected flow, request, response, or notify"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Exchange per topicspace, routing key includes class
|
exchange_name = f"{topicspace}.{cls}.{topic}"
|
||||||
exchange_name = topicspace
|
|
||||||
routing_key = f"{cls}.{topic}"
|
|
||||||
|
|
||||||
return exchange_name, routing_key, cls, durable
|
return exchange_name, cls, durable
|
||||||
|
|
||||||
# Keep map_queue_name for backward compatibility with tests
|
|
||||||
def map_queue_name(self, queue_id: str) -> tuple[str, bool]:
|
|
||||||
exchange, routing_key, cls, durable = self._parse_queue_id(queue_id)
|
|
||||||
return f"{exchange}.{routing_key}", durable
|
|
||||||
|
|
||||||
def create_producer(self, topic: str, schema: type,
|
def create_producer(self, topic: str, schema: type,
|
||||||
**options) -> BackendProducer:
|
**options) -> BackendProducer:
|
||||||
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
|
exchange, cls, durable = self._parse_topic(topic)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Creating producer: exchange={exchange}, "
|
f"Creating producer: exchange={exchange}"
|
||||||
f"routing_key={routing_key}"
|
|
||||||
)
|
)
|
||||||
return RabbitMQBackendProducer(
|
return RabbitMQBackendProducer(
|
||||||
self._connection_params, exchange, routing_key, durable,
|
self._connection_params, exchange, durable,
|
||||||
)
|
)
|
||||||
|
|
||||||
def create_consumer(self, topic: str, subscription: str, schema: type,
|
def create_consumer(self, topic: str, subscription: str, schema: type,
|
||||||
initial_position: str = 'latest',
|
initial_position: str = 'latest',
|
||||||
**options) -> BackendConsumer:
|
**options) -> BackendConsumer:
|
||||||
"""Create a consumer with a queue bound to the topic exchange.
|
"""Create a consumer with a queue bound to the topic's exchange.
|
||||||
|
|
||||||
Behaviour is determined by the topic's class prefix:
|
Behaviour is determined by the topic's class prefix:
|
||||||
- flow: named durable queue, competing consumers (round-robin)
|
- flow: named durable queue, competing consumers (round-robin)
|
||||||
|
|
@ -393,7 +387,7 @@ class RabbitMQBackend:
|
||||||
- response: anonymous ephemeral queue, per-subscriber (auto-delete)
|
- response: anonymous ephemeral queue, per-subscriber (auto-delete)
|
||||||
- notify: anonymous ephemeral queue, per-subscriber (auto-delete)
|
- notify: anonymous ephemeral queue, per-subscriber (auto-delete)
|
||||||
"""
|
"""
|
||||||
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
|
exchange, cls, durable = self._parse_topic(topic)
|
||||||
|
|
||||||
if cls in ('response', 'notify'):
|
if cls in ('response', 'notify'):
|
||||||
# Per-subscriber: anonymous queue, auto-deleted on disconnect
|
# Per-subscriber: anonymous queue, auto-deleted on disconnect
|
||||||
|
|
@ -403,45 +397,33 @@ class RabbitMQBackend:
|
||||||
auto_delete = True
|
auto_delete = True
|
||||||
else:
|
else:
|
||||||
# Shared: named queue, competing consumers
|
# Shared: named queue, competing consumers
|
||||||
queue_name = f"{exchange}.{routing_key}.{subscription}"
|
queue_name = f"{exchange}.{subscription}"
|
||||||
queue_durable = durable
|
queue_durable = durable
|
||||||
exclusive = False
|
exclusive = False
|
||||||
auto_delete = False
|
auto_delete = False
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Creating consumer: exchange={exchange}, "
|
f"Creating consumer: exchange={exchange}, "
|
||||||
f"routing_key={routing_key}, queue={queue_name or '(anonymous)'}, "
|
f"queue={queue_name or '(anonymous)'}, cls={cls}"
|
||||||
f"cls={cls}"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
return RabbitMQBackendConsumer(
|
return RabbitMQBackendConsumer(
|
||||||
self._connection_params, exchange, routing_key,
|
self._connection_params, exchange,
|
||||||
queue_name, schema, queue_durable, exclusive, auto_delete,
|
queue_name, schema, queue_durable, exclusive, auto_delete,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _create_queue_sync(self, exchange, routing_key, queue_name, durable):
|
def _create_topic_sync(self, exchange_name):
|
||||||
"""Blocking queue creation — run via asyncio.to_thread."""
|
"""Blocking exchange creation — run via asyncio.to_thread."""
|
||||||
connection = None
|
connection = None
|
||||||
try:
|
try:
|
||||||
connection = pika.BlockingConnection(self._connection_params)
|
connection = pika.BlockingConnection(self._connection_params)
|
||||||
channel = connection.channel()
|
channel = connection.channel()
|
||||||
channel.exchange_declare(
|
channel.exchange_declare(
|
||||||
exchange=exchange,
|
exchange=exchange_name,
|
||||||
exchange_type='topic',
|
exchange_type='fanout',
|
||||||
durable=True,
|
durable=True,
|
||||||
)
|
)
|
||||||
channel.queue_declare(
|
logger.info(f"Created topic (exchange): {exchange_name}")
|
||||||
queue=queue_name,
|
|
||||||
durable=durable,
|
|
||||||
exclusive=False,
|
|
||||||
auto_delete=False,
|
|
||||||
)
|
|
||||||
channel.queue_bind(
|
|
||||||
queue=queue_name,
|
|
||||||
exchange=exchange,
|
|
||||||
routing_key=routing_key,
|
|
||||||
)
|
|
||||||
logger.info(f"Created queue: {queue_name}")
|
|
||||||
finally:
|
finally:
|
||||||
if connection and connection.is_open:
|
if connection and connection.is_open:
|
||||||
try:
|
try:
|
||||||
|
|
@ -449,34 +431,30 @@ class RabbitMQBackend:
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
async def create_queue(self, topic: str, subscription: str) -> None:
|
async def create_topic(self, topic: str) -> None:
|
||||||
"""Pre-create a named queue bound to the topic exchange.
|
"""Create the fanout exchange for a logical topic.
|
||||||
|
|
||||||
Only applies to shared queues (flow/request class). Response and
|
Only applies to flow and request class topics. Response and
|
||||||
notify queues are anonymous/auto-delete and created by consumers.
|
notify exchanges are created on demand by consumers.
|
||||||
"""
|
"""
|
||||||
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
|
exchange, cls, durable = self._parse_topic(topic)
|
||||||
|
|
||||||
if cls in ('response', 'notify'):
|
if cls in ('response', 'notify'):
|
||||||
return
|
return
|
||||||
|
|
||||||
queue_name = f"{exchange}.{routing_key}.{subscription}"
|
await asyncio.to_thread(self._create_topic_sync, exchange)
|
||||||
await asyncio.to_thread(
|
|
||||||
self._create_queue_sync, exchange, routing_key,
|
|
||||||
queue_name, durable,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _delete_queue_sync(self, queue_name):
|
def _delete_topic_sync(self, exchange_name):
|
||||||
"""Blocking queue deletion — run via asyncio.to_thread."""
|
"""Blocking exchange deletion — run via asyncio.to_thread."""
|
||||||
connection = None
|
connection = None
|
||||||
try:
|
try:
|
||||||
connection = pika.BlockingConnection(self._connection_params)
|
connection = pika.BlockingConnection(self._connection_params)
|
||||||
channel = connection.channel()
|
channel = connection.channel()
|
||||||
channel.queue_delete(queue=queue_name)
|
channel.exchange_delete(exchange=exchange_name)
|
||||||
logger.info(f"Deleted queue: {queue_name}")
|
logger.info(f"Deleted topic (exchange): {exchange_name}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Idempotent — queue may already be gone
|
# Idempotent — exchange may already be gone
|
||||||
logger.debug(f"Queue delete for {queue_name}: {e}")
|
logger.debug(f"Exchange delete for {exchange_name}: {e}")
|
||||||
finally:
|
finally:
|
||||||
if connection and connection.is_open:
|
if connection and connection.is_open:
|
||||||
try:
|
try:
|
||||||
|
|
@ -484,31 +462,27 @@ class RabbitMQBackend:
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
async def delete_queue(self, topic: str, subscription: str) -> None:
|
async def delete_topic(self, topic: str) -> None:
|
||||||
"""Delete a named queue and any messages it contains.
|
"""Delete a topic's fanout exchange.
|
||||||
|
|
||||||
Only applies to shared queues (flow/request class). Response and
|
Consumer queues lose their binding and drain naturally.
|
||||||
notify queues are anonymous/auto-delete and managed by the broker.
|
|
||||||
"""
|
"""
|
||||||
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
|
exchange, cls, durable = self._parse_topic(topic)
|
||||||
|
await asyncio.to_thread(self._delete_topic_sync, exchange)
|
||||||
|
|
||||||
if cls in ('response', 'notify'):
|
def _topic_exists_sync(self, exchange_name):
|
||||||
return
|
"""Blocking exchange existence check — run via asyncio.to_thread.
|
||||||
|
|
||||||
queue_name = f"{exchange}.{routing_key}.{subscription}"
|
|
||||||
await asyncio.to_thread(self._delete_queue_sync, queue_name)
|
|
||||||
|
|
||||||
def _queue_exists_sync(self, queue_name):
|
|
||||||
"""Blocking queue existence check — run via asyncio.to_thread.
|
|
||||||
Uses passive=True which checks without creating."""
|
Uses passive=True which checks without creating."""
|
||||||
connection = None
|
connection = None
|
||||||
try:
|
try:
|
||||||
connection = pika.BlockingConnection(self._connection_params)
|
connection = pika.BlockingConnection(self._connection_params)
|
||||||
channel = connection.channel()
|
channel = connection.channel()
|
||||||
channel.queue_declare(queue=queue_name, passive=True)
|
channel.exchange_declare(
|
||||||
|
exchange=exchange_name, passive=True,
|
||||||
|
)
|
||||||
return True
|
return True
|
||||||
except pika.exceptions.ChannelClosedByBroker:
|
except pika.exceptions.ChannelClosedByBroker:
|
||||||
# 404 NOT_FOUND — queue does not exist
|
# 404 NOT_FOUND — exchange does not exist
|
||||||
return False
|
return False
|
||||||
finally:
|
finally:
|
||||||
if connection and connection.is_open:
|
if connection and connection.is_open:
|
||||||
|
|
@ -517,26 +491,25 @@ class RabbitMQBackend:
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
async def queue_exists(self, topic: str, subscription: str) -> bool:
|
async def topic_exists(self, topic: str) -> bool:
|
||||||
"""Check whether a named queue exists.
|
"""Check whether a topic's exchange exists.
|
||||||
|
|
||||||
Only applies to shared queues (flow/request class). Response and
|
Only applies to flow and request class topics. Response and
|
||||||
notify queues are anonymous/ephemeral — always returns False.
|
notify topics are ephemeral — always returns False.
|
||||||
"""
|
"""
|
||||||
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
|
exchange, cls, durable = self._parse_topic(topic)
|
||||||
|
|
||||||
if cls in ('response', 'notify'):
|
if cls in ('response', 'notify'):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
queue_name = f"{exchange}.{routing_key}.{subscription}"
|
|
||||||
return await asyncio.to_thread(
|
return await asyncio.to_thread(
|
||||||
self._queue_exists_sync, queue_name
|
self._topic_exists_sync, exchange
|
||||||
)
|
)
|
||||||
|
|
||||||
async def ensure_queue(self, topic: str, subscription: str) -> None:
|
async def ensure_topic(self, topic: str) -> None:
|
||||||
"""Ensure a queue exists, creating it if necessary."""
|
"""Ensure a topic exists, creating it if necessary."""
|
||||||
if not await self.queue_exists(topic, subscription):
|
if not await self.topic_exists(topic):
|
||||||
await self.create_queue(topic, subscription)
|
await self.create_topic(topic)
|
||||||
|
|
||||||
def close(self) -> None:
|
def close(self) -> None:
|
||||||
pass
|
pass
|
||||||
|
|
|
||||||
|
|
@ -124,9 +124,7 @@ class Processor(AsyncProcessor):
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
|
|
||||||
await self.pubsub.ensure_queue(
|
await self.pubsub.ensure_topic(self.config_request_topic)
|
||||||
self.config_request_topic, self.config_request_subscriber
|
|
||||||
)
|
|
||||||
await self.push() # Startup poke: empty types = everything
|
await self.push() # Startup poke: empty types = everything
|
||||||
await self.config_request_consumer.start()
|
await self.config_request_consumer.start()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -119,9 +119,7 @@ class Processor(AsyncProcessor):
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
|
|
||||||
await self.pubsub.ensure_queue(
|
await self.pubsub.ensure_topic(self.knowledge_request_topic)
|
||||||
self.knowledge_request_topic, self.knowledge_request_subscriber
|
|
||||||
)
|
|
||||||
await super(Processor, self).start()
|
await super(Processor, self).start()
|
||||||
await self.knowledge_request_consumer.start()
|
await self.knowledge_request_consumer.start()
|
||||||
await self.knowledge_response_producer.start()
|
await self.knowledge_response_producer.start()
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ import logging
|
||||||
# Module logger
|
# Module logger
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Queue deletion retry settings
|
# Topic deletion retry settings
|
||||||
DELETE_RETRIES = 5
|
DELETE_RETRIES = 5
|
||||||
DELETE_RETRY_DELAY = 2 # seconds
|
DELETE_RETRY_DELAY = 2 # seconds
|
||||||
|
|
||||||
|
|
@ -215,11 +215,11 @@ class FlowConfig:
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
# Pre-create flow-level queues so the data path is wired
|
# Pre-create topic exchanges so the data path is wired
|
||||||
# before processors receive their config and start connecting.
|
# before processors receive their config and start connecting.
|
||||||
queues = self._collect_flow_queues(cls, repl_template_with_params)
|
topics = self._collect_flow_topics(cls, repl_template_with_params)
|
||||||
for topic, subscription in queues:
|
for topic in topics:
|
||||||
await self.pubsub.create_queue(topic, subscription)
|
await self.pubsub.create_topic(topic)
|
||||||
|
|
||||||
# Build all processor config updates, then write in a single batch.
|
# Build all processor config updates, then write in a single batch.
|
||||||
updates = []
|
updates = []
|
||||||
|
|
@ -283,8 +283,8 @@ class FlowConfig:
|
||||||
error = None,
|
error = None,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def ensure_existing_flow_queues(self):
|
async def ensure_existing_flow_topics(self):
|
||||||
"""Ensure queues exist for all already-running flows.
|
"""Ensure topics exist for all already-running flows.
|
||||||
|
|
||||||
Called on startup to handle flows that were started before this
|
Called on startup to handle flows that were started before this
|
||||||
version of the flow service was deployed, or before a restart.
|
version of the flow service was deployed, or before a restart.
|
||||||
|
|
@ -315,7 +315,7 @@ class FlowConfig:
|
||||||
if blueprint_data is None:
|
if blueprint_data is None:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"Blueprint '{blueprint_name}' not found for "
|
f"Blueprint '{blueprint_name}' not found for "
|
||||||
f"flow '{flow_id}', skipping queue creation"
|
f"flow '{flow_id}', skipping topic creation"
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
@ -333,65 +333,63 @@ class FlowConfig:
|
||||||
)
|
)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
queues = self._collect_flow_queues(cls, repl_template)
|
topics = self._collect_flow_topics(cls, repl_template)
|
||||||
for topic, subscription in queues:
|
for topic in topics:
|
||||||
await self.pubsub.ensure_queue(topic, subscription)
|
await self.pubsub.ensure_topic(topic)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Ensured queues for existing flow '{flow_id}'"
|
f"Ensured topics for existing flow '{flow_id}'"
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
f"Failed to ensure queues for flow '{flow_id}': {e}"
|
f"Failed to ensure topics for flow '{flow_id}': {e}"
|
||||||
)
|
)
|
||||||
|
|
||||||
def _collect_flow_queues(self, cls, repl_template):
|
def _collect_flow_topics(self, cls, repl_template):
|
||||||
"""Collect (topic, subscription) pairs for all flow-level queues.
|
"""Collect unique topic identifiers from the blueprint.
|
||||||
|
|
||||||
Iterates the blueprint's "flow" section and reads only the
|
Iterates the blueprint's "flow" section and returns a
|
||||||
"topics" dict from each processor entry.
|
deduplicated set of resolved topic strings. The flow service
|
||||||
|
manages topic lifecycle (create/delete exchanges), not
|
||||||
|
individual consumer queues.
|
||||||
"""
|
"""
|
||||||
queues = []
|
topics = set()
|
||||||
|
|
||||||
for k, v in cls["flow"].items():
|
for k, v in cls["flow"].items():
|
||||||
processor, variant = k.split(":", 1)
|
|
||||||
variant = repl_template(variant)
|
|
||||||
|
|
||||||
for spec_name, topic_template in v.get("topics", {}).items():
|
for spec_name, topic_template in v.get("topics", {}).items():
|
||||||
topic = repl_template(topic_template)
|
topic = repl_template(topic_template)
|
||||||
subscription = f"{processor}--{variant}--{spec_name}"
|
topics.add(topic)
|
||||||
queues.append((topic, subscription))
|
|
||||||
|
|
||||||
return queues
|
return topics
|
||||||
|
|
||||||
async def _delete_queues(self, queues):
|
async def _delete_topics(self, topics):
|
||||||
"""Delete queues with retries. Best-effort — logs failures but
|
"""Delete topics with retries. Best-effort — logs failures but
|
||||||
does not raise."""
|
does not raise."""
|
||||||
for attempt in range(DELETE_RETRIES):
|
for attempt in range(DELETE_RETRIES):
|
||||||
remaining = []
|
remaining = []
|
||||||
|
|
||||||
for topic, subscription in queues:
|
for topic in topics:
|
||||||
try:
|
try:
|
||||||
await self.pubsub.delete_queue(topic, subscription)
|
await self.pubsub.delete_topic(topic)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"Queue delete failed (attempt {attempt + 1}/"
|
f"Topic delete failed (attempt {attempt + 1}/"
|
||||||
f"{DELETE_RETRIES}): {topic}: {e}"
|
f"{DELETE_RETRIES}): {topic}: {e}"
|
||||||
)
|
)
|
||||||
remaining.append((topic, subscription))
|
remaining.append(topic)
|
||||||
|
|
||||||
if not remaining:
|
if not remaining:
|
||||||
return
|
return
|
||||||
|
|
||||||
queues = remaining
|
topics = remaining
|
||||||
|
|
||||||
if attempt < DELETE_RETRIES - 1:
|
if attempt < DELETE_RETRIES - 1:
|
||||||
await asyncio.sleep(DELETE_RETRY_DELAY)
|
await asyncio.sleep(DELETE_RETRY_DELAY)
|
||||||
|
|
||||||
for topic, subscription in queues:
|
for topic in topics:
|
||||||
logger.error(
|
logger.error(
|
||||||
f"Failed to delete queue after {DELETE_RETRIES} "
|
f"Failed to delete topic after {DELETE_RETRIES} "
|
||||||
f"attempts: {topic}"
|
f"attempts: {topic}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -426,8 +424,8 @@ class FlowConfig:
|
||||||
result = result.replace(f"{{{param_name}}}", str(param_value))
|
result = result.replace(f"{{{param_name}}}", str(param_value))
|
||||||
return result
|
return result
|
||||||
|
|
||||||
# Collect queue identifiers before removing config
|
# Collect topic identifiers before removing config
|
||||||
queues = self._collect_flow_queues(cls, repl_template)
|
topics = self._collect_flow_topics(cls, repl_template)
|
||||||
|
|
||||||
# Phase 1: Set status to "stopping" and remove processor config.
|
# Phase 1: Set status to "stopping" and remove processor config.
|
||||||
# The config push tells processors to shut down their consumers.
|
# The config push tells processors to shut down their consumers.
|
||||||
|
|
@ -448,8 +446,8 @@ class FlowConfig:
|
||||||
|
|
||||||
await self.config.delete_many(deletes)
|
await self.config.delete_many(deletes)
|
||||||
|
|
||||||
# Phase 2: Delete queues with retries, then remove the flow record.
|
# Phase 2: Delete topics with retries, then remove the flow record.
|
||||||
await self._delete_queues(queues)
|
await self._delete_topics(topics)
|
||||||
|
|
||||||
if msg.flow_id in await self.config.keys("flow"):
|
if msg.flow_id in await self.config.keys("flow"):
|
||||||
await self.config.delete("flow", msg.flow_id)
|
await self.config.delete("flow", msg.flow_id)
|
||||||
|
|
|
||||||
|
|
@ -101,11 +101,9 @@ class Processor(AsyncProcessor):
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
|
|
||||||
await self.pubsub.ensure_queue(
|
await self.pubsub.ensure_topic(self.flow_request_topic)
|
||||||
self.flow_request_topic, self.flow_request_subscriber
|
|
||||||
)
|
|
||||||
await self.config_client.start()
|
await self.config_client.start()
|
||||||
await self.flow.ensure_existing_flow_queues()
|
await self.flow.ensure_existing_flow_topics()
|
||||||
await self.flow_request_consumer.start()
|
await self.flow_request_consumer.start()
|
||||||
|
|
||||||
async def on_flow_request(self, msg, consumer, flow):
|
async def on_flow_request(self, msg, consumer, flow):
|
||||||
|
|
|
||||||
|
|
@ -263,12 +263,8 @@ class Processor(AsyncProcessor):
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
|
|
||||||
await self.pubsub.ensure_queue(
|
await self.pubsub.ensure_topic(self.librarian_request_topic)
|
||||||
self.librarian_request_topic, self.librarian_request_subscriber
|
await self.pubsub.ensure_topic(self.collection_request_topic)
|
||||||
)
|
|
||||||
await self.pubsub.ensure_queue(
|
|
||||||
self.collection_request_topic, self.collection_request_subscriber
|
|
||||||
)
|
|
||||||
await super(Processor, self).start()
|
await super(Processor, self).start()
|
||||||
await self.librarian_request_consumer.start()
|
await self.librarian_request_consumer.start()
|
||||||
await self.librarian_response_producer.start()
|
await self.librarian_response_producer.start()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue