Refactor: Derive consumer behaviour from queue class (#772)

Derive consumer behaviour from queue class, remove
consumer_type parameter

The queue class prefix (flow, request, response, notify) now
fully determines consumer behaviour in both RabbitMQ and Pulsar
backends.  Added 'notify' class for ephemeral broadcast (config
push notifications).  Response and notify classes always create
per-subscriber auto-delete queues, eliminating orphaned queues
that accumulated on service restarts.

Change init-trustgraph to set up the 'notify' namespace in
Pulsar instead of old hangover 'state'.

Fixes 'stuck backlog' on RabbitMQ config notification queue.
This commit is contained in:
cybermaggedon 2026-04-09 09:55:41 +01:00 committed by GitHub
parent aff96e57cb
commit feeb92b33f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 93 additions and 95 deletions

View file

@ -151,7 +151,7 @@ class TestConfigReceiver:
mock_backend = Mock()
config_receiver = ConfigReceiver(mock_backend)
# Mock config_client
# Mock _create_config_client to return a mock client
mock_resp = Mock()
mock_resp.error = None
mock_resp.version = 5
@ -164,7 +164,7 @@ class TestConfigReceiver:
mock_client = AsyncMock()
mock_client.request.return_value = mock_resp
config_receiver.config_client = mock_client
config_receiver._create_config_client = Mock(return_value=mock_client)
start_flow_calls = []
async def mock_start_flow(id, flow):
@ -202,7 +202,7 @@ class TestConfigReceiver:
mock_client = AsyncMock()
mock_client.request.return_value = mock_resp
config_receiver.config_client = mock_client
config_receiver._create_config_client = Mock(return_value=mock_client)
stop_flow_calls = []
async def mock_stop_flow(id, flow):
@ -229,7 +229,7 @@ class TestConfigReceiver:
mock_client = AsyncMock()
mock_client.request.return_value = mock_resp
config_receiver.config_client = mock_client
config_receiver._create_config_client = Mock(return_value=mock_client)
await config_receiver.fetch_and_apply()
@ -353,7 +353,7 @@ class TestConfigReceiver:
mock_client = AsyncMock()
mock_client.request.return_value = mock_resp
config_receiver.config_client = mock_client
config_receiver._create_config_client = Mock(return_value=mock_client)
start_calls = []
stop_calls = []

View file

@ -21,8 +21,8 @@ class TestQueueFunction:
def test_response_class(self):
assert queue('config', cls='response') == 'response:tg:config'
def test_state_class(self):
assert queue('config', cls='state') == 'state:tg:config'
def test_notify_class(self):
assert queue('config', cls='notify') == 'notify:tg:config'
def test_custom_topicspace(self):
assert queue('config', cls='request', topicspace='prod') == 'request:prod:config'
@ -44,9 +44,9 @@ class TestPulsarMapTopic:
assert backend.map_topic('flow:tg:text-completion-request') == \
'persistent://tg/flow/text-completion-request'
def test_state_maps_to_persistent(self, backend):
assert backend.map_topic('state:tg:config') == \
'persistent://tg/state/config'
def test_notify_maps_to_non_persistent(self, backend):
assert backend.map_topic('notify:tg:config') == \
'non-persistent://tg/notify/config'
def test_request_maps_to_non_persistent(self, backend):
assert backend.map_topic('request:tg:config') == \
@ -153,7 +153,7 @@ class TestQueueDefinitions:
def test_config_push(self):
from trustgraph.schema.services.config import config_push_queue
assert config_push_queue == 'flow:tg:config'
assert config_push_queue == 'notify:tg:config'
def test_librarian_request(self):
from trustgraph.schema.services.library import librarian_request_queue

View file

@ -24,10 +24,10 @@ class TestRabbitMQMapQueueName:
assert durable is True
assert name == 'tg.flow.text-completion-request'
def test_state_is_durable(self, backend):
name, durable = backend.map_queue_name('state:tg:config')
assert durable is True
assert name == 'tg.state.config'
def test_notify_is_not_durable(self, backend):
name, durable = backend.map_queue_name('notify:tg:config')
assert durable is False
assert name == 'tg.notify.config'
def test_request_is_not_durable(self, backend):
name, durable = backend.map_queue_name('request:tg:config')

View file

@ -124,18 +124,22 @@ class PubSubBackend(Protocol):
subscription: str,
schema: type,
initial_position: str = 'latest',
consumer_type: str = 'shared',
**options
) -> BackendConsumer:
"""
Create a consumer for a topic.
Consumer behaviour is determined by the topic's class prefix:
- flow: shared competing consumers, durable named queue
- request: shared competing consumers, non-durable named queue
- response: exclusive per-subscriber, anonymous auto-delete queue
- notify: exclusive per-subscriber, anonymous auto-delete queue
Args:
topic: Generic topic format (qos/tenant/namespace/queue)
topic: Queue identifier in class:topicspace:topic format
subscription: Subscription/consumer group name
schema: Dataclass type for messages
initial_position: 'earliest' or 'latest' (some backends may ignore)
consumer_type: 'shared', 'exclusive', 'failover' (some backends may ignore)
**options: Backend-specific options
Returns:

View file

@ -33,19 +33,17 @@ class Consumer:
rate_limit_retry_time = 10, rate_limit_timeout = 7200,
reconnect_time = 5,
concurrency = 1, # Number of concurrent requests to handle
consumer_type = 'shared',
**kwargs,
):
self.taskgroup = taskgroup
self.flow = flow
self.backend = backend # Changed from 'client' to 'backend'
self.backend = backend
self.topic = topic
self.subscriber = subscriber
self.schema = schema
self.handler = handler
self.consumer_type = consumer_type
self.rate_limit_retry_time = rate_limit_retry_time
self.rate_limit_timeout = rate_limit_timeout
@ -129,7 +127,6 @@ class Consumer:
subscription = self.subscriber,
schema = self.schema,
initial_position = initial_pos,
consumer_type = self.consumer_type,
),
)
consumers.append(c)

View file

@ -64,7 +64,6 @@ class LibrarianClient:
schema=LibrarianResponse,
handler=self._on_response,
metrics=librarian_response_metrics,
consumer_type='exclusive',
)
# Single-response requests: request_id -> asyncio.Future

View file

@ -159,14 +159,16 @@ class PulsarBackend:
cls, topicspace, topic = parts
# Map class to Pulsar persistence and namespace
if cls in ('flow', 'state'):
if cls == 'flow':
persistence = 'persistent'
elif cls in ('request', 'response'):
persistence = 'non-persistent'
elif cls == 'notify':
persistence = 'non-persistent'
else:
raise ValueError(
f"Invalid queue class: {cls}, "
f"expected flow, request, response, or state"
f"expected flow, request, response, or notify"
)
return f"{persistence}://{topicspace}/{cls}/{topic}"
@ -205,18 +207,20 @@ class PulsarBackend:
subscription: str,
schema: type,
initial_position: str = 'latest',
consumer_type: str = 'shared',
**options
) -> BackendConsumer:
"""
Create a Pulsar consumer.
Consumer type is derived from the topic's class prefix:
- flow/request: Shared (competing consumers)
- response/notify: Exclusive (per-subscriber)
Args:
topic: Generic topic format (qos/tenant/namespace/queue)
topic: Queue identifier in class:topicspace:topic format
subscription: Subscription name
schema: Dataclass type for messages
initial_position: 'earliest' or 'latest'
consumer_type: 'shared', 'exclusive', or 'failover'
**options: Backend-specific options
Returns:
@ -224,17 +228,18 @@ class PulsarBackend:
"""
pulsar_topic = self.map_topic(topic)
# Extract class from topic for consumer type mapping
cls = topic.split(':', 1)[0] if ':' in topic else 'flow'
# Map initial position
if initial_position == 'earliest':
pos = pulsar.InitialPosition.Earliest
else:
pos = pulsar.InitialPosition.Latest
# Map consumer type
if consumer_type == 'exclusive':
# Map consumer type from class
if cls in ('response', 'notify'):
ctype = pulsar.ConsumerType.Exclusive
elif consumer_type == 'failover':
ctype = pulsar.ConsumerType.Failover
else:
ctype = pulsar.ConsumerType.Shared

View file

@ -311,14 +311,14 @@ class RabbitMQBackend:
cls, topicspace, topic = parts
if cls in ('flow', 'state'):
if cls == 'flow':
durable = True
elif cls in ('request', 'response'):
elif cls in ('request', 'response', 'notify'):
durable = False
else:
raise ValueError(
f"Invalid queue class: {cls}, "
f"expected flow, request, response, or state"
f"expected flow, request, response, or notify"
)
# Exchange per topicspace, routing key includes class
@ -345,26 +345,19 @@ class RabbitMQBackend:
def create_consumer(self, topic: str, subscription: str, schema: type,
initial_position: str = 'latest',
consumer_type: str = 'shared',
**options) -> BackendConsumer:
"""Create a consumer with a queue bound to the topic exchange.
consumer_type='shared': Named durable queue. Multiple consumers
with the same subscription compete (round-robin).
consumer_type='exclusive': Anonymous ephemeral queue. Each
consumer gets its own copy of every message (broadcast).
Behaviour is determined by the topic's class prefix:
- flow: named durable queue, competing consumers (round-robin)
- request: named non-durable queue, competing consumers
- response: anonymous ephemeral queue, per-subscriber (auto-delete)
- notify: anonymous ephemeral queue, per-subscriber (auto-delete)
"""
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
if consumer_type == 'exclusive' and cls == 'state':
# State broadcast: named durable queue per subscriber.
# Retains messages so late-starting processors see current state.
queue_name = f"{exchange}.{routing_key}.{subscription}"
queue_durable = True
exclusive = False
auto_delete = False
elif consumer_type == 'exclusive':
# Broadcast: anonymous queue, auto-deleted on disconnect
if cls in ('response', 'notify'):
# Per-subscriber: anonymous queue, auto-deleted on disconnect
queue_name = ''
queue_durable = False
exclusive = True
@ -379,7 +372,7 @@ class RabbitMQBackend:
logger.debug(
f"Creating consumer: exchange={exchange}, "
f"routing_key={routing_key}, queue={queue_name or '(anonymous)'}, "
f"type={consumer_type}"
f"cls={cls}"
)
return RabbitMQBackendConsumer(

View file

@ -84,7 +84,6 @@ class Subscriber:
topic=self.topic,
subscription=self.subscription,
schema=self.schema,
consumer_type='exclusive',
),
)

View file

@ -42,7 +42,6 @@ class BaseClient:
topic=output_queue,
subscription=subscriber,
schema=output_schema,
consumer_type='shared',
)
self.input_schema = input_schema

View file

@ -6,10 +6,10 @@ def queue(topic, cls='flow', topicspace='tg'):
Args:
topic: The logical queue name (e.g. 'config', 'librarian')
cls: Queue class determining operational characteristics:
- 'flow' = persistent processing pipeline queue
- 'request' = non-persistent, short TTL request queue
- 'response' = non-persistent, short TTL response queue
- 'state' = persistent, last-value state broadcast
- 'flow' = persistent shared work queue (competing consumers)
- 'request' = non-persistent RPC request queue (shared)
- 'response' = non-persistent RPC response queue (per-subscriber)
- 'notify' = ephemeral broadcast (per-subscriber, auto-delete)
topicspace: Deployment isolation prefix (default: 'tg')
Returns:
@ -20,7 +20,7 @@ def queue(topic, cls='flow', topicspace='tg'):
# flow:tg:text-completion-request
queue('config', cls='request')
# request:tg:config
queue('config', cls='state')
# state:tg:config
queue('config', cls='notify')
# notify:tg:config
"""
return f"{cls}:{topicspace}:{topic}"

View file

@ -62,7 +62,7 @@ class ConfigPush:
config_request_queue = queue('config', cls='request')
config_response_queue = queue('config', cls='response')
config_push_queue = queue('config', cls='flow')
config_push_queue = queue('config', cls='notify')
############################################################################

View file

@ -138,10 +138,10 @@ def init_pulsar(pulsar_admin_url, tenant):
}
})
ensure_namespace(pulsar_admin_url, tenant, "state", {
ensure_namespace(pulsar_admin_url, tenant, "notify", {
"retention_policies": {
"retentionSizeInMB": 10,
"retentionTimeInMinutes": -1,
"retentionSizeInMB": -1,
"retentionTimeInMinutes": 3,
"subscriptionExpirationTimeMinutes": 5,
}
})

View file

@ -123,7 +123,6 @@ async def monitor(flow, queue_type, max_lines, max_width, **config):
topic=request_queue,
subscription="prompt-monitor-req",
schema=None,
consumer_type='shared',
initial_position='latest',
)
@ -131,7 +130,6 @@ async def monitor(flow, queue_type, max_lines, max_width, **config):
topic=response_queue,
subscription="prompt-monitor-resp",
schema=None,
consumer_type='shared',
initial_position='latest',
)

View file

@ -73,6 +73,31 @@ class ConfigReceiver:
f"Config notify processing exception: {e}", exc_info=True
)
def _create_config_client(self):
"""Create a short-lived config request/response client."""
id = str(uuid.uuid4())
config_req_metrics = ProducerMetrics(
processor="api-gateway", flow=None,
name="config-request",
)
config_resp_metrics = SubscriberMetrics(
processor="api-gateway", flow=None,
name="config-response",
)
return RequestResponse(
backend=self.backend,
subscription=f"api-gateway--config--{id}",
consumer_name="api-gateway",
request_topic=config_request_queue,
request_schema=ConfigRequest,
request_metrics=config_req_metrics,
response_topic=config_response_queue,
response_schema=ConfigResponse,
response_metrics=config_resp_metrics,
)
async def fetch_and_apply(self, retry=False):
"""Fetch full config and apply flow changes.
If retry=True, keeps retrying until successful."""
@ -82,10 +107,15 @@ class ConfigReceiver:
try:
logger.info("Fetching config from config service...")
resp = await self.config_client.request(
client = self._create_config_client()
try:
await client.start()
resp = await client.request(
ConfigRequest(operation="config"),
timeout=10,
)
finally:
await client.stop()
logger.info(f"Config response received")
@ -170,32 +200,6 @@ class ConfigReceiver:
id = str(uuid.uuid4())
# Config request/response client
config_req_metrics = ProducerMetrics(
processor="api-gateway", flow=None,
name="config-request",
)
config_resp_metrics = SubscriberMetrics(
processor="api-gateway", flow=None,
name="config-response",
)
self.config_client = RequestResponse(
backend=self.backend,
subscription=f"api-gateway--config--{id}",
consumer_name="api-gateway",
request_topic=config_request_queue,
request_schema=ConfigRequest,
request_metrics=config_req_metrics,
response_topic=config_response_queue,
response_schema=ConfigResponse,
response_metrics=config_resp_metrics,
)
logger.info("Starting config request/response client...")
await self.config_client.start()
logger.info("Config request/response client started")
# Subscribe to notify queue
self.config_cons = Consumer(
taskgroup=tg,