mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-25 08:26:21 +02:00
Refactor: Derive consumer behaviour from queue class (#772)
Derive consumer behaviour from queue class, remove consumer_type parameter The queue class prefix (flow, request, response, notify) now fully determines consumer behaviour in both RabbitMQ and Pulsar backends. Added 'notify' class for ephemeral broadcast (config push notifications). Response and notify classes always create per-subscriber auto-delete queues, eliminating orphaned queues that accumulated on service restarts. Change init-trustgraph to set up the 'notify' namespace in Pulsar instead of old hangover 'state'. Fixes 'stuck backlog' on RabbitMQ config notification queue.
This commit is contained in:
parent
aff96e57cb
commit
feeb92b33f
15 changed files with 93 additions and 95 deletions
|
|
@ -124,18 +124,22 @@ class PubSubBackend(Protocol):
|
|||
subscription: str,
|
||||
schema: type,
|
||||
initial_position: str = 'latest',
|
||||
consumer_type: str = 'shared',
|
||||
**options
|
||||
) -> BackendConsumer:
|
||||
"""
|
||||
Create a consumer for a topic.
|
||||
|
||||
Consumer behaviour is determined by the topic's class prefix:
|
||||
- flow: shared competing consumers, durable named queue
|
||||
- request: shared competing consumers, non-durable named queue
|
||||
- response: exclusive per-subscriber, anonymous auto-delete queue
|
||||
- notify: exclusive per-subscriber, anonymous auto-delete queue
|
||||
|
||||
Args:
|
||||
topic: Generic topic format (qos/tenant/namespace/queue)
|
||||
topic: Queue identifier in class:topicspace:topic format
|
||||
subscription: Subscription/consumer group name
|
||||
schema: Dataclass type for messages
|
||||
initial_position: 'earliest' or 'latest' (some backends may ignore)
|
||||
consumer_type: 'shared', 'exclusive', 'failover' (some backends may ignore)
|
||||
**options: Backend-specific options
|
||||
|
||||
Returns:
|
||||
|
|
|
|||
|
|
@ -33,19 +33,17 @@ class Consumer:
|
|||
rate_limit_retry_time = 10, rate_limit_timeout = 7200,
|
||||
reconnect_time = 5,
|
||||
concurrency = 1, # Number of concurrent requests to handle
|
||||
consumer_type = 'shared',
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
self.taskgroup = taskgroup
|
||||
self.flow = flow
|
||||
self.backend = backend # Changed from 'client' to 'backend'
|
||||
self.backend = backend
|
||||
self.topic = topic
|
||||
self.subscriber = subscriber
|
||||
self.schema = schema
|
||||
self.handler = handler
|
||||
|
||||
self.consumer_type = consumer_type
|
||||
|
||||
self.rate_limit_retry_time = rate_limit_retry_time
|
||||
self.rate_limit_timeout = rate_limit_timeout
|
||||
|
||||
|
|
@ -129,7 +127,6 @@ class Consumer:
|
|||
subscription = self.subscriber,
|
||||
schema = self.schema,
|
||||
initial_position = initial_pos,
|
||||
consumer_type = self.consumer_type,
|
||||
),
|
||||
)
|
||||
consumers.append(c)
|
||||
|
|
|
|||
|
|
@ -64,7 +64,6 @@ class LibrarianClient:
|
|||
schema=LibrarianResponse,
|
||||
handler=self._on_response,
|
||||
metrics=librarian_response_metrics,
|
||||
consumer_type='exclusive',
|
||||
)
|
||||
|
||||
# Single-response requests: request_id -> asyncio.Future
|
||||
|
|
|
|||
|
|
@ -159,14 +159,16 @@ class PulsarBackend:
|
|||
cls, topicspace, topic = parts
|
||||
|
||||
# Map class to Pulsar persistence and namespace
|
||||
if cls in ('flow', 'state'):
|
||||
if cls == 'flow':
|
||||
persistence = 'persistent'
|
||||
elif cls in ('request', 'response'):
|
||||
persistence = 'non-persistent'
|
||||
elif cls == 'notify':
|
||||
persistence = 'non-persistent'
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Invalid queue class: {cls}, "
|
||||
f"expected flow, request, response, or state"
|
||||
f"expected flow, request, response, or notify"
|
||||
)
|
||||
|
||||
return f"{persistence}://{topicspace}/{cls}/{topic}"
|
||||
|
|
@ -205,18 +207,20 @@ class PulsarBackend:
|
|||
subscription: str,
|
||||
schema: type,
|
||||
initial_position: str = 'latest',
|
||||
consumer_type: str = 'shared',
|
||||
**options
|
||||
) -> BackendConsumer:
|
||||
"""
|
||||
Create a Pulsar consumer.
|
||||
|
||||
Consumer type is derived from the topic's class prefix:
|
||||
- flow/request: Shared (competing consumers)
|
||||
- response/notify: Exclusive (per-subscriber)
|
||||
|
||||
Args:
|
||||
topic: Generic topic format (qos/tenant/namespace/queue)
|
||||
topic: Queue identifier in class:topicspace:topic format
|
||||
subscription: Subscription name
|
||||
schema: Dataclass type for messages
|
||||
initial_position: 'earliest' or 'latest'
|
||||
consumer_type: 'shared', 'exclusive', or 'failover'
|
||||
**options: Backend-specific options
|
||||
|
||||
Returns:
|
||||
|
|
@ -224,17 +228,18 @@ class PulsarBackend:
|
|||
"""
|
||||
pulsar_topic = self.map_topic(topic)
|
||||
|
||||
# Extract class from topic for consumer type mapping
|
||||
cls = topic.split(':', 1)[0] if ':' in topic else 'flow'
|
||||
|
||||
# Map initial position
|
||||
if initial_position == 'earliest':
|
||||
pos = pulsar.InitialPosition.Earliest
|
||||
else:
|
||||
pos = pulsar.InitialPosition.Latest
|
||||
|
||||
# Map consumer type
|
||||
if consumer_type == 'exclusive':
|
||||
# Map consumer type from class
|
||||
if cls in ('response', 'notify'):
|
||||
ctype = pulsar.ConsumerType.Exclusive
|
||||
elif consumer_type == 'failover':
|
||||
ctype = pulsar.ConsumerType.Failover
|
||||
else:
|
||||
ctype = pulsar.ConsumerType.Shared
|
||||
|
||||
|
|
|
|||
|
|
@ -311,14 +311,14 @@ class RabbitMQBackend:
|
|||
|
||||
cls, topicspace, topic = parts
|
||||
|
||||
if cls in ('flow', 'state'):
|
||||
if cls == 'flow':
|
||||
durable = True
|
||||
elif cls in ('request', 'response'):
|
||||
elif cls in ('request', 'response', 'notify'):
|
||||
durable = False
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Invalid queue class: {cls}, "
|
||||
f"expected flow, request, response, or state"
|
||||
f"expected flow, request, response, or notify"
|
||||
)
|
||||
|
||||
# Exchange per topicspace, routing key includes class
|
||||
|
|
@ -345,26 +345,19 @@ class RabbitMQBackend:
|
|||
|
||||
def create_consumer(self, topic: str, subscription: str, schema: type,
|
||||
initial_position: str = 'latest',
|
||||
consumer_type: str = 'shared',
|
||||
**options) -> BackendConsumer:
|
||||
"""Create a consumer with a queue bound to the topic exchange.
|
||||
|
||||
consumer_type='shared': Named durable queue. Multiple consumers
|
||||
with the same subscription compete (round-robin).
|
||||
consumer_type='exclusive': Anonymous ephemeral queue. Each
|
||||
consumer gets its own copy of every message (broadcast).
|
||||
Behaviour is determined by the topic's class prefix:
|
||||
- flow: named durable queue, competing consumers (round-robin)
|
||||
- request: named non-durable queue, competing consumers
|
||||
- response: anonymous ephemeral queue, per-subscriber (auto-delete)
|
||||
- notify: anonymous ephemeral queue, per-subscriber (auto-delete)
|
||||
"""
|
||||
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
|
||||
|
||||
if consumer_type == 'exclusive' and cls == 'state':
|
||||
# State broadcast: named durable queue per subscriber.
|
||||
# Retains messages so late-starting processors see current state.
|
||||
queue_name = f"{exchange}.{routing_key}.{subscription}"
|
||||
queue_durable = True
|
||||
exclusive = False
|
||||
auto_delete = False
|
||||
elif consumer_type == 'exclusive':
|
||||
# Broadcast: anonymous queue, auto-deleted on disconnect
|
||||
if cls in ('response', 'notify'):
|
||||
# Per-subscriber: anonymous queue, auto-deleted on disconnect
|
||||
queue_name = ''
|
||||
queue_durable = False
|
||||
exclusive = True
|
||||
|
|
@ -379,7 +372,7 @@ class RabbitMQBackend:
|
|||
logger.debug(
|
||||
f"Creating consumer: exchange={exchange}, "
|
||||
f"routing_key={routing_key}, queue={queue_name or '(anonymous)'}, "
|
||||
f"type={consumer_type}"
|
||||
f"cls={cls}"
|
||||
)
|
||||
|
||||
return RabbitMQBackendConsumer(
|
||||
|
|
|
|||
|
|
@ -84,7 +84,6 @@ class Subscriber:
|
|||
topic=self.topic,
|
||||
subscription=self.subscription,
|
||||
schema=self.schema,
|
||||
consumer_type='exclusive',
|
||||
),
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -42,7 +42,6 @@ class BaseClient:
|
|||
topic=output_queue,
|
||||
subscription=subscriber,
|
||||
schema=output_schema,
|
||||
consumer_type='shared',
|
||||
)
|
||||
|
||||
self.input_schema = input_schema
|
||||
|
|
|
|||
|
|
@ -6,10 +6,10 @@ def queue(topic, cls='flow', topicspace='tg'):
|
|||
Args:
|
||||
topic: The logical queue name (e.g. 'config', 'librarian')
|
||||
cls: Queue class determining operational characteristics:
|
||||
- 'flow' = persistent processing pipeline queue
|
||||
- 'request' = non-persistent, short TTL request queue
|
||||
- 'response' = non-persistent, short TTL response queue
|
||||
- 'state' = persistent, last-value state broadcast
|
||||
- 'flow' = persistent shared work queue (competing consumers)
|
||||
- 'request' = non-persistent RPC request queue (shared)
|
||||
- 'response' = non-persistent RPC response queue (per-subscriber)
|
||||
- 'notify' = ephemeral broadcast (per-subscriber, auto-delete)
|
||||
topicspace: Deployment isolation prefix (default: 'tg')
|
||||
|
||||
Returns:
|
||||
|
|
@ -20,7 +20,7 @@ def queue(topic, cls='flow', topicspace='tg'):
|
|||
# flow:tg:text-completion-request
|
||||
queue('config', cls='request')
|
||||
# request:tg:config
|
||||
queue('config', cls='state')
|
||||
# state:tg:config
|
||||
queue('config', cls='notify')
|
||||
# notify:tg:config
|
||||
"""
|
||||
return f"{cls}:{topicspace}:{topic}"
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ class ConfigPush:
|
|||
|
||||
config_request_queue = queue('config', cls='request')
|
||||
config_response_queue = queue('config', cls='response')
|
||||
config_push_queue = queue('config', cls='flow')
|
||||
config_push_queue = queue('config', cls='notify')
|
||||
|
||||
############################################################################
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue