diff --git a/tests/unit/test_gateway/test_config_receiver.py b/tests/unit/test_gateway/test_config_receiver.py index 49dc48d8..c2a149d5 100644 --- a/tests/unit/test_gateway/test_config_receiver.py +++ b/tests/unit/test_gateway/test_config_receiver.py @@ -151,7 +151,7 @@ class TestConfigReceiver: mock_backend = Mock() config_receiver = ConfigReceiver(mock_backend) - # Mock config_client + # Mock _create_config_client to return a mock client mock_resp = Mock() mock_resp.error = None mock_resp.version = 5 @@ -164,7 +164,7 @@ class TestConfigReceiver: mock_client = AsyncMock() mock_client.request.return_value = mock_resp - config_receiver.config_client = mock_client + config_receiver._create_config_client = Mock(return_value=mock_client) start_flow_calls = [] async def mock_start_flow(id, flow): @@ -202,7 +202,7 @@ class TestConfigReceiver: mock_client = AsyncMock() mock_client.request.return_value = mock_resp - config_receiver.config_client = mock_client + config_receiver._create_config_client = Mock(return_value=mock_client) stop_flow_calls = [] async def mock_stop_flow(id, flow): @@ -229,7 +229,7 @@ class TestConfigReceiver: mock_client = AsyncMock() mock_client.request.return_value = mock_resp - config_receiver.config_client = mock_client + config_receiver._create_config_client = Mock(return_value=mock_client) await config_receiver.fetch_and_apply() @@ -353,7 +353,7 @@ class TestConfigReceiver: mock_client = AsyncMock() mock_client.request.return_value = mock_resp - config_receiver.config_client = mock_client + config_receiver._create_config_client = Mock(return_value=mock_client) start_calls = [] stop_calls = [] diff --git a/tests/unit/test_pubsub/test_queue_naming.py b/tests/unit/test_pubsub/test_queue_naming.py index 8ab09e5a..243b0e33 100644 --- a/tests/unit/test_pubsub/test_queue_naming.py +++ b/tests/unit/test_pubsub/test_queue_naming.py @@ -21,8 +21,8 @@ class TestQueueFunction: def test_response_class(self): assert queue('config', cls='response') == 'response:tg:config' - def test_state_class(self): - assert queue('config', cls='state') == 'state:tg:config' + def test_notify_class(self): + assert queue('config', cls='notify') == 'notify:tg:config' def test_custom_topicspace(self): assert queue('config', cls='request', topicspace='prod') == 'request:prod:config' @@ -44,9 +44,9 @@ class TestPulsarMapTopic: assert backend.map_topic('flow:tg:text-completion-request') == \ 'persistent://tg/flow/text-completion-request' - def test_state_maps_to_persistent(self, backend): - assert backend.map_topic('state:tg:config') == \ - 'persistent://tg/state/config' + def test_notify_maps_to_non_persistent(self, backend): + assert backend.map_topic('notify:tg:config') == \ + 'non-persistent://tg/notify/config' def test_request_maps_to_non_persistent(self, backend): assert backend.map_topic('request:tg:config') == \ @@ -153,7 +153,7 @@ class TestQueueDefinitions: def test_config_push(self): from trustgraph.schema.services.config import config_push_queue - assert config_push_queue == 'flow:tg:config' + assert config_push_queue == 'notify:tg:config' def test_librarian_request(self): from trustgraph.schema.services.library import librarian_request_queue diff --git a/tests/unit/test_pubsub/test_rabbitmq_backend.py b/tests/unit/test_pubsub/test_rabbitmq_backend.py index 578db3b6..ffe18fd7 100644 --- a/tests/unit/test_pubsub/test_rabbitmq_backend.py +++ b/tests/unit/test_pubsub/test_rabbitmq_backend.py @@ -24,10 +24,10 @@ class TestRabbitMQMapQueueName: assert durable is True assert name == 'tg.flow.text-completion-request' - def test_state_is_durable(self, backend): - name, durable = backend.map_queue_name('state:tg:config') - assert durable is True - assert name == 'tg.state.config' + def test_notify_is_not_durable(self, backend): + name, durable = backend.map_queue_name('notify:tg:config') + assert durable is False + assert name == 'tg.notify.config' def test_request_is_not_durable(self, backend): name, durable = backend.map_queue_name('request:tg:config') diff --git a/trustgraph-base/trustgraph/base/backend.py b/trustgraph-base/trustgraph/base/backend.py index b9f5f923..9b9a42af 100644 --- a/trustgraph-base/trustgraph/base/backend.py +++ b/trustgraph-base/trustgraph/base/backend.py @@ -124,18 +124,22 @@ class PubSubBackend(Protocol): subscription: str, schema: type, initial_position: str = 'latest', - consumer_type: str = 'shared', **options ) -> BackendConsumer: """ Create a consumer for a topic. + Consumer behaviour is determined by the topic's class prefix: + - flow: shared competing consumers, durable named queue + - request: shared competing consumers, non-durable named queue + - response: exclusive per-subscriber, anonymous auto-delete queue + - notify: exclusive per-subscriber, anonymous auto-delete queue + Args: - topic: Generic topic format (qos/tenant/namespace/queue) + topic: Queue identifier in class:topicspace:topic format subscription: Subscription/consumer group name schema: Dataclass type for messages initial_position: 'earliest' or 'latest' (some backends may ignore) - consumer_type: 'shared', 'exclusive', 'failover' (some backends may ignore) **options: Backend-specific options Returns: diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index b6c28bbe..1b9e5999 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -33,19 +33,17 @@ class Consumer: rate_limit_retry_time = 10, rate_limit_timeout = 7200, reconnect_time = 5, concurrency = 1, # Number of concurrent requests to handle - consumer_type = 'shared', + **kwargs, ): self.taskgroup = taskgroup self.flow = flow - self.backend = backend # Changed from 'client' to 'backend' + self.backend = backend self.topic = topic self.subscriber = subscriber self.schema = schema self.handler = handler - self.consumer_type = consumer_type - self.rate_limit_retry_time = rate_limit_retry_time self.rate_limit_timeout = rate_limit_timeout @@ -129,7 +127,6 @@ class Consumer: subscription = self.subscriber, schema = self.schema, initial_position = initial_pos, - consumer_type = self.consumer_type, ), ) consumers.append(c) diff --git a/trustgraph-base/trustgraph/base/librarian_client.py b/trustgraph-base/trustgraph/base/librarian_client.py index 6191cff8..5ad97f47 100644 --- a/trustgraph-base/trustgraph/base/librarian_client.py +++ b/trustgraph-base/trustgraph/base/librarian_client.py @@ -64,7 +64,6 @@ class LibrarianClient: schema=LibrarianResponse, handler=self._on_response, metrics=librarian_response_metrics, - consumer_type='exclusive', ) # Single-response requests: request_id -> asyncio.Future diff --git a/trustgraph-base/trustgraph/base/pulsar_backend.py b/trustgraph-base/trustgraph/base/pulsar_backend.py index 9480243e..a567191e 100644 --- a/trustgraph-base/trustgraph/base/pulsar_backend.py +++ b/trustgraph-base/trustgraph/base/pulsar_backend.py @@ -159,14 +159,16 @@ class PulsarBackend: cls, topicspace, topic = parts # Map class to Pulsar persistence and namespace - if cls in ('flow', 'state'): + if cls == 'flow': persistence = 'persistent' elif cls in ('request', 'response'): persistence = 'non-persistent' + elif cls == 'notify': + persistence = 'non-persistent' else: raise ValueError( f"Invalid queue class: {cls}, " - f"expected flow, request, response, or state" + f"expected flow, request, response, or notify" ) return f"{persistence}://{topicspace}/{cls}/{topic}" @@ -205,18 +207,20 @@ class PulsarBackend: subscription: str, schema: type, initial_position: str = 'latest', - consumer_type: str = 'shared', **options ) -> BackendConsumer: """ Create a Pulsar consumer. + Consumer type is derived from the topic's class prefix: + - flow/request: Shared (competing consumers) + - response/notify: Exclusive (per-subscriber) + Args: - topic: Generic topic format (qos/tenant/namespace/queue) + topic: Queue identifier in class:topicspace:topic format subscription: Subscription name schema: Dataclass type for messages initial_position: 'earliest' or 'latest' - consumer_type: 'shared', 'exclusive', or 'failover' **options: Backend-specific options Returns: @@ -224,17 +228,18 @@ class PulsarBackend: """ pulsar_topic = self.map_topic(topic) + # Extract class from topic for consumer type mapping + cls = topic.split(':', 1)[0] if ':' in topic else 'flow' + # Map initial position if initial_position == 'earliest': pos = pulsar.InitialPosition.Earliest else: pos = pulsar.InitialPosition.Latest - # Map consumer type - if consumer_type == 'exclusive': + # Map consumer type from class + if cls in ('response', 'notify'): ctype = pulsar.ConsumerType.Exclusive - elif consumer_type == 'failover': - ctype = pulsar.ConsumerType.Failover else: ctype = pulsar.ConsumerType.Shared diff --git a/trustgraph-base/trustgraph/base/rabbitmq_backend.py b/trustgraph-base/trustgraph/base/rabbitmq_backend.py index b9afe741..3fafcead 100644 --- a/trustgraph-base/trustgraph/base/rabbitmq_backend.py +++ b/trustgraph-base/trustgraph/base/rabbitmq_backend.py @@ -311,14 +311,14 @@ class RabbitMQBackend: cls, topicspace, topic = parts - if cls in ('flow', 'state'): + if cls == 'flow': durable = True - elif cls in ('request', 'response'): + elif cls in ('request', 'response', 'notify'): durable = False else: raise ValueError( f"Invalid queue class: {cls}, " - f"expected flow, request, response, or state" + f"expected flow, request, response, or notify" ) # Exchange per topicspace, routing key includes class @@ -345,26 +345,19 @@ class RabbitMQBackend: def create_consumer(self, topic: str, subscription: str, schema: type, initial_position: str = 'latest', - consumer_type: str = 'shared', **options) -> BackendConsumer: """Create a consumer with a queue bound to the topic exchange. - consumer_type='shared': Named durable queue. Multiple consumers - with the same subscription compete (round-robin). - consumer_type='exclusive': Anonymous ephemeral queue. Each - consumer gets its own copy of every message (broadcast). + Behaviour is determined by the topic's class prefix: + - flow: named durable queue, competing consumers (round-robin) + - request: named non-durable queue, competing consumers + - response: anonymous ephemeral queue, per-subscriber (auto-delete) + - notify: anonymous ephemeral queue, per-subscriber (auto-delete) """ exchange, routing_key, cls, durable = self._parse_queue_id(topic) - if consumer_type == 'exclusive' and cls == 'state': - # State broadcast: named durable queue per subscriber. - # Retains messages so late-starting processors see current state. - queue_name = f"{exchange}.{routing_key}.{subscription}" - queue_durable = True - exclusive = False - auto_delete = False - elif consumer_type == 'exclusive': - # Broadcast: anonymous queue, auto-deleted on disconnect + if cls in ('response', 'notify'): + # Per-subscriber: anonymous queue, auto-deleted on disconnect queue_name = '' queue_durable = False exclusive = True @@ -379,7 +372,7 @@ class RabbitMQBackend: logger.debug( f"Creating consumer: exchange={exchange}, " f"routing_key={routing_key}, queue={queue_name or '(anonymous)'}, " - f"type={consumer_type}" + f"cls={cls}" ) return RabbitMQBackendConsumer( diff --git a/trustgraph-base/trustgraph/base/subscriber.py b/trustgraph-base/trustgraph/base/subscriber.py index 6cb234b1..8c68e51c 100644 --- a/trustgraph-base/trustgraph/base/subscriber.py +++ b/trustgraph-base/trustgraph/base/subscriber.py @@ -84,7 +84,6 @@ class Subscriber: topic=self.topic, subscription=self.subscription, schema=self.schema, - consumer_type='exclusive', ), ) diff --git a/trustgraph-base/trustgraph/clients/base.py b/trustgraph-base/trustgraph/clients/base.py index cd4ad72e..02575c21 100644 --- a/trustgraph-base/trustgraph/clients/base.py +++ b/trustgraph-base/trustgraph/clients/base.py @@ -42,7 +42,6 @@ class BaseClient: topic=output_queue, subscription=subscriber, schema=output_schema, - consumer_type='shared', ) self.input_schema = input_schema diff --git a/trustgraph-base/trustgraph/schema/core/topic.py b/trustgraph-base/trustgraph/schema/core/topic.py index 036ea142..c42a6c9c 100644 --- a/trustgraph-base/trustgraph/schema/core/topic.py +++ b/trustgraph-base/trustgraph/schema/core/topic.py @@ -6,10 +6,10 @@ def queue(topic, cls='flow', topicspace='tg'): Args: topic: The logical queue name (e.g. 'config', 'librarian') cls: Queue class determining operational characteristics: - - 'flow' = persistent processing pipeline queue - - 'request' = non-persistent, short TTL request queue - - 'response' = non-persistent, short TTL response queue - - 'state' = persistent, last-value state broadcast + - 'flow' = persistent shared work queue (competing consumers) + - 'request' = non-persistent RPC request queue (shared) + - 'response' = non-persistent RPC response queue (per-subscriber) + - 'notify' = ephemeral broadcast (per-subscriber, auto-delete) topicspace: Deployment isolation prefix (default: 'tg') Returns: @@ -20,7 +20,7 @@ def queue(topic, cls='flow', topicspace='tg'): # flow:tg:text-completion-request queue('config', cls='request') # request:tg:config - queue('config', cls='state') - # state:tg:config + queue('config', cls='notify') + # notify:tg:config """ return f"{cls}:{topicspace}:{topic}" diff --git a/trustgraph-base/trustgraph/schema/services/config.py b/trustgraph-base/trustgraph/schema/services/config.py index fb219bd9..c08e96d7 100644 --- a/trustgraph-base/trustgraph/schema/services/config.py +++ b/trustgraph-base/trustgraph/schema/services/config.py @@ -62,7 +62,7 @@ class ConfigPush: config_request_queue = queue('config', cls='request') config_response_queue = queue('config', cls='response') -config_push_queue = queue('config', cls='flow') +config_push_queue = queue('config', cls='notify') ############################################################################ diff --git a/trustgraph-cli/trustgraph/cli/init_trustgraph.py b/trustgraph-cli/trustgraph/cli/init_trustgraph.py index 514dc75b..18c240ef 100644 --- a/trustgraph-cli/trustgraph/cli/init_trustgraph.py +++ b/trustgraph-cli/trustgraph/cli/init_trustgraph.py @@ -138,10 +138,10 @@ def init_pulsar(pulsar_admin_url, tenant): } }) - ensure_namespace(pulsar_admin_url, tenant, "state", { + ensure_namespace(pulsar_admin_url, tenant, "notify", { "retention_policies": { - "retentionSizeInMB": 10, - "retentionTimeInMinutes": -1, + "retentionSizeInMB": -1, + "retentionTimeInMinutes": 3, "subscriptionExpirationTimeMinutes": 5, } }) diff --git a/trustgraph-cli/trustgraph/cli/monitor_prompts.py b/trustgraph-cli/trustgraph/cli/monitor_prompts.py index 0cfe68ac..4e0e8456 100644 --- a/trustgraph-cli/trustgraph/cli/monitor_prompts.py +++ b/trustgraph-cli/trustgraph/cli/monitor_prompts.py @@ -123,7 +123,6 @@ async def monitor(flow, queue_type, max_lines, max_width, **config): topic=request_queue, subscription="prompt-monitor-req", schema=None, - consumer_type='shared', initial_position='latest', ) @@ -131,7 +130,6 @@ async def monitor(flow, queue_type, max_lines, max_width, **config): topic=response_queue, subscription="prompt-monitor-resp", schema=None, - consumer_type='shared', initial_position='latest', ) diff --git a/trustgraph-flow/trustgraph/gateway/config/receiver.py b/trustgraph-flow/trustgraph/gateway/config/receiver.py index 97f4e7eb..2323cd61 100755 --- a/trustgraph-flow/trustgraph/gateway/config/receiver.py +++ b/trustgraph-flow/trustgraph/gateway/config/receiver.py @@ -73,6 +73,31 @@ class ConfigReceiver: f"Config notify processing exception: {e}", exc_info=True ) + def _create_config_client(self): + """Create a short-lived config request/response client.""" + id = str(uuid.uuid4()) + + config_req_metrics = ProducerMetrics( + processor="api-gateway", flow=None, + name="config-request", + ) + config_resp_metrics = SubscriberMetrics( + processor="api-gateway", flow=None, + name="config-response", + ) + + return RequestResponse( + backend=self.backend, + subscription=f"api-gateway--config--{id}", + consumer_name="api-gateway", + request_topic=config_request_queue, + request_schema=ConfigRequest, + request_metrics=config_req_metrics, + response_topic=config_response_queue, + response_schema=ConfigResponse, + response_metrics=config_resp_metrics, + ) + async def fetch_and_apply(self, retry=False): """Fetch full config and apply flow changes. If retry=True, keeps retrying until successful.""" @@ -82,10 +107,15 @@ class ConfigReceiver: try: logger.info("Fetching config from config service...") - resp = await self.config_client.request( - ConfigRequest(operation="config"), - timeout=10, - ) + client = self._create_config_client() + try: + await client.start() + resp = await client.request( + ConfigRequest(operation="config"), + timeout=10, + ) + finally: + await client.stop() logger.info(f"Config response received") @@ -170,32 +200,6 @@ class ConfigReceiver: id = str(uuid.uuid4()) - # Config request/response client - config_req_metrics = ProducerMetrics( - processor="api-gateway", flow=None, - name="config-request", - ) - config_resp_metrics = SubscriberMetrics( - processor="api-gateway", flow=None, - name="config-response", - ) - - self.config_client = RequestResponse( - backend=self.backend, - subscription=f"api-gateway--config--{id}", - consumer_name="api-gateway", - request_topic=config_request_queue, - request_schema=ConfigRequest, - request_metrics=config_req_metrics, - response_topic=config_response_queue, - response_schema=ConfigResponse, - response_metrics=config_resp_metrics, - ) - - logger.info("Starting config request/response client...") - await self.config_client.start() - logger.info("Config request/response client started") - # Subscribe to notify queue self.config_cons = Consumer( taskgroup=tg,