diff --git a/tests/unit/test_extract/test_ontology/test_prompt_and_extraction.py b/tests/unit/test_extract/test_ontology/test_prompt_and_extraction.py index bae6bdbd..9f9c8551 100644 --- a/tests/unit/test_extract/test_ontology/test_prompt_and_extraction.py +++ b/tests/unit/test_extract/test_ontology/test_prompt_and_extraction.py @@ -231,52 +231,6 @@ class TestTripleValidation: is_valid = extractor.is_valid_triple(subject, predicate, object_val, sample_ontology_subset) assert is_valid == expected, f"Validation of {predicate} should be {expected}" - def test_validates_domain_correctly_with_entity_types(self, extractor, sample_ontology_subset): - """Test domain validation correctly compares against extracted entity_types.""" - subject = "my-recipe" - predicate = "produces" - object_val = "my-food" - - # Proper domain for produces is Recipe - entity_types = { - "my-recipe": "Recipe", - "my-food": "Food" - } - - is_valid = extractor.is_valid_triple(subject, predicate, object_val, sample_ontology_subset, entity_types) - assert is_valid, "Valid domain should be accepted" - - # Invalid domain - entity_types_invalid = { - "my-recipe": "Ingredient", - "my-food": "Food" - } - is_invalid = extractor.is_valid_triple(subject, predicate, object_val, sample_ontology_subset, entity_types_invalid) - assert not is_invalid, "Invalid domain should be rejected" - - def test_validates_range_correctly_with_entity_types(self, extractor, sample_ontology_subset): - """Test range validation correctly compares against extracted entity_types.""" - subject = "my-recipe" - predicate = "produces" - object_val = "my-food" - - # Proper range for produces is Food - entity_types = { - "my-recipe": "Recipe", - "my-food": "Food" - } - - is_valid = extractor.is_valid_triple(subject, predicate, object_val, sample_ontology_subset, entity_types) - assert is_valid, "Valid range should be accepted" - - # Invalid range - entity_types_invalid = { - "my-recipe": "Recipe", - "my-food": "Recipe" - } - is_invalid = extractor.is_valid_triple(subject, predicate, object_val, sample_ontology_subset, entity_types_invalid) - assert not is_invalid, "Invalid range should be rejected" - class TestTripleParsing: """Test suite for parsing triples from LLM responses.""" diff --git a/tests/unit/test_pubsub/test_rabbitmq_backend.py b/tests/unit/test_pubsub/test_rabbitmq_backend.py index 54599723..ffe18fd7 100644 --- a/tests/unit/test_pubsub/test_rabbitmq_backend.py +++ b/tests/unit/test_pubsub/test_rabbitmq_backend.py @@ -1,5 +1,5 @@ """ -Unit tests for RabbitMQ backend — topic parsing and factory dispatch. +Unit tests for RabbitMQ backend — queue name mapping and factory dispatch. Does not require a running RabbitMQ instance. """ @@ -12,7 +12,7 @@ from trustgraph.base.rabbitmq_backend import RabbitMQBackend from trustgraph.base.pubsub import get_pubsub, add_pubsub_args -class TestRabbitMQParseTopic: +class TestRabbitMQMapQueueName: @pytest.fixture def backend(self): @@ -20,48 +20,43 @@ class TestRabbitMQParseTopic: return b def test_flow_is_durable(self, backend): - exchange, cls, durable = backend._parse_topic('flow:tg:text-completion-request') + name, durable = backend.map_queue_name('flow:tg:text-completion-request') assert durable is True - assert cls == 'flow' - assert exchange == 'tg.flow.text-completion-request' + assert name == 'tg.flow.text-completion-request' def test_notify_is_not_durable(self, backend): - exchange, cls, durable = backend._parse_topic('notify:tg:config') + name, durable = backend.map_queue_name('notify:tg:config') assert durable is False - assert cls == 'notify' - assert exchange == 'tg.notify.config' + assert name == 'tg.notify.config' def test_request_is_not_durable(self, backend): - exchange, cls, durable = backend._parse_topic('request:tg:config') + name, durable = backend.map_queue_name('request:tg:config') assert durable is False - assert cls == 'request' - assert exchange == 'tg.request.config' + assert name == 'tg.request.config' def test_response_is_not_durable(self, backend): - exchange, cls, durable = backend._parse_topic('response:tg:librarian') + name, durable = backend.map_queue_name('response:tg:librarian') assert durable is False - assert cls == 'response' - assert exchange == 'tg.response.librarian' + assert name == 'tg.response.librarian' def test_custom_topicspace(self, backend): - exchange, cls, durable = backend._parse_topic('flow:prod:my-queue') - assert exchange == 'prod.flow.my-queue' + name, durable = backend.map_queue_name('flow:prod:my-queue') + assert name == 'prod.flow.my-queue' assert durable is True def test_no_colon_defaults_to_flow(self, backend): - exchange, cls, durable = backend._parse_topic('simple-queue') - assert exchange == 'tg.flow.simple-queue' - assert cls == 'flow' - assert durable is True + name, durable = backend.map_queue_name('simple-queue') + assert name == 'tg.simple-queue' + assert durable is False def test_invalid_class_raises(self, backend): - with pytest.raises(ValueError, match="Invalid topic class"): - backend._parse_topic('unknown:tg:topic') + with pytest.raises(ValueError, match="Invalid queue class"): + backend.map_queue_name('unknown:tg:topic') - def test_topic_with_flow_suffix(self, backend): - """Topic names with flow suffix (e.g. :default) are preserved.""" - exchange, cls, durable = backend._parse_topic('request:tg:prompt:default') - assert exchange == 'tg.request.prompt:default' + def test_flow_with_flow_suffix(self, backend): + """Queue names with flow suffix (e.g. :default) are preserved.""" + name, durable = backend.map_queue_name('request:tg:prompt:default') + assert name == 'tg.request.prompt:default' class TestGetPubsubRabbitMQ: diff --git a/trustgraph-base/trustgraph/base/backend.py b/trustgraph-base/trustgraph/base/backend.py index a105ca17..0f95ca1b 100644 --- a/trustgraph-base/trustgraph/base/backend.py +++ b/trustgraph-base/trustgraph/base/backend.py @@ -121,7 +121,7 @@ class PubSubBackend(Protocol): Create a producer for a topic. Args: - topic: Queue identifier in class:topicspace:topic format + topic: Generic topic format (qos/tenant/namespace/queue) schema: Dataclass type for messages **options: Backend-specific options (e.g., chunking_enabled) @@ -159,55 +159,59 @@ class PubSubBackend(Protocol): """ ... - async def create_topic(self, topic: str) -> None: + async def create_queue(self, topic: str, subscription: str) -> None: """ - Create the broker-side resources for a logical topic. + Pre-create a queue so it exists before any consumer connects. - For RabbitMQ this creates a fanout exchange. For Pulsar this is - a no-op (topics auto-create on first use). + The topic and subscription together identify the queue, mirroring + create_consumer where the queue name is derived from both. - Idempotent — creating an already-existing topic succeeds silently. + Idempotent — creating an already-existing queue succeeds silently. Args: - topic: Topic identifier in class:topicspace:topic format + topic: Queue identifier in class:topicspace:topic format + subscription: Subscription/consumer group name """ ... - async def delete_topic(self, topic: str) -> None: + async def delete_queue(self, topic: str, subscription: str) -> None: """ - Delete a topic and discard any in-flight messages. + Delete a queue and any messages it contains. - For RabbitMQ this deletes the fanout exchange; consumer queues - lose their binding and drain naturally. + The topic and subscription together identify the queue, mirroring + create_consumer where the queue name is derived from both. - Idempotent — deleting a non-existent topic succeeds silently. + Idempotent — deleting a non-existent queue succeeds silently. Args: - topic: Topic identifier in class:topicspace:topic format + topic: Queue identifier in class:topicspace:topic format + subscription: Subscription/consumer group name """ ... - async def topic_exists(self, topic: str) -> bool: + async def queue_exists(self, topic: str, subscription: str) -> bool: """ - Check whether a topic exists. + Check whether a queue exists. Args: - topic: Topic identifier in class:topicspace:topic format + topic: Queue identifier in class:topicspace:topic format + subscription: Subscription/consumer group name Returns: - True if the topic exists, False otherwise. + True if the queue exists, False otherwise. """ ... - async def ensure_topic(self, topic: str) -> None: + async def ensure_queue(self, topic: str, subscription: str) -> None: """ - Ensure a topic exists, creating it if necessary. + Ensure a queue exists, creating it if necessary. Convenience wrapper — checks existence, creates if missing. - Used by the flow service and system services on startup. + Used by system services on startup. Args: - topic: Topic identifier in class:topicspace:topic format + topic: Queue identifier in class:topicspace:topic format + subscription: Subscription/consumer group name """ ... diff --git a/trustgraph-base/trustgraph/base/pulsar_backend.py b/trustgraph-base/trustgraph/base/pulsar_backend.py index e27d16af..2100483d 100644 --- a/trustgraph-base/trustgraph/base/pulsar_backend.py +++ b/trustgraph-base/trustgraph/base/pulsar_backend.py @@ -266,22 +266,22 @@ class PulsarBackend: return PulsarBackendConsumer(pulsar_consumer, schema) - async def create_topic(self, topic: str) -> None: + async def create_queue(self, topic: str, subscription: str) -> None: """No-op — Pulsar auto-creates topics on first use. TODO: Use admin REST API for explicit persistent topic creation.""" pass - async def delete_topic(self, topic: str) -> None: + async def delete_queue(self, topic: str, subscription: str) -> None: """No-op — to be replaced with admin REST API calls. - TODO: Delete persistent topic via admin API.""" + TODO: Delete subscription and persistent topic via admin API.""" pass - async def topic_exists(self, topic: str) -> bool: + async def queue_exists(self, topic: str, subscription: str) -> bool: """Returns True — Pulsar auto-creates on subscribe. TODO: Use admin REST API for actual existence check.""" return True - async def ensure_topic(self, topic: str) -> None: + async def ensure_queue(self, topic: str, subscription: str) -> None: """No-op — Pulsar auto-creates topics on first use. TODO: Use admin REST API for explicit creation.""" pass diff --git a/trustgraph-base/trustgraph/base/rabbitmq_backend.py b/trustgraph-base/trustgraph/base/rabbitmq_backend.py index 73b80cb9..43c717c3 100644 --- a/trustgraph-base/trustgraph/base/rabbitmq_backend.py +++ b/trustgraph-base/trustgraph/base/rabbitmq_backend.py @@ -1,24 +1,22 @@ """ RabbitMQ backend implementation for pub/sub abstraction. -Each logical topic maps to its own fanout exchange. The exchange name -encodes the full topic identity: +Uses a single topic exchange per topicspace. The logical queue name +becomes the routing key. Consumer behavior is determined by the +subscription name: - class:topicspace:topic → exchange topicspace.class.topic +- Same subscription + same topic = shared queue (competing consumers) +- Different subscriptions = separate queues (broadcast / fan-out) -Producers publish to the exchange with an empty routing key. -Consumers declare and bind their own queues: - - - flow / request: named durable/non-durable queue (competing consumers) - - response / notify: anonymous exclusive auto-delete queue (per-subscriber) - -The flow service manages topic lifecycle (create/delete exchanges). -Consumers manage their own queue lifecycle (declare + bind on connect). +This mirrors Pulsar's subscription model using idiomatic RabbitMQ. Architecture: - Producer --> [fanout exchange] --> [named queue] --> Consumer - --> [named queue] --> Consumer - --> [exclusive queue] --> Subscriber + Producer --> [tg exchange] --routing key--> [named queue] --> Consumer + --routing key--> [named queue] --> Consumer + --routing key--> [exclusive q] --> Subscriber + +Uses basic_consume (push) instead of basic_get (polling) for +efficient message delivery. """ import asyncio @@ -60,16 +58,18 @@ class RabbitMQMessage: class RabbitMQBackendProducer: - """Publishes messages to a fanout exchange. + """Publishes messages to a topic exchange with a routing key. Uses thread-local connections so each thread gets its own connection/channel. This avoids wire corruption from concurrent threads writing to the same socket (pika is not thread-safe). """ - def __init__(self, connection_params, exchange_name, durable): + def __init__(self, connection_params, exchange_name, routing_key, + durable): self._connection_params = connection_params self._exchange_name = exchange_name + self._routing_key = routing_key self._durable = durable self._local = threading.local() @@ -90,7 +90,7 @@ class RabbitMQBackendProducer: chan = conn.channel() chan.exchange_declare( exchange=self._exchange_name, - exchange_type='fanout', + exchange_type='topic', durable=True, ) self._local.connection = conn @@ -113,7 +113,7 @@ class RabbitMQBackendProducer: channel = self._get_channel() channel.basic_publish( exchange=self._exchange_name, - routing_key='', + routing_key=self._routing_key, body=json_data.encode('utf-8'), properties=amqp_properties, ) @@ -144,17 +144,19 @@ class RabbitMQBackendProducer: class RabbitMQBackendConsumer: - """Consumes from a queue bound to a fanout exchange. + """Consumes from a queue bound to a topic exchange. Uses basic_consume (push model) with messages delivered to an internal thread-safe queue. process_data_events() drives both message delivery and heartbeat processing. """ - def __init__(self, connection_params, exchange_name, queue_name, - schema_cls, durable, exclusive=False, auto_delete=False): + def __init__(self, connection_params, exchange_name, routing_key, + queue_name, schema_cls, durable, exclusive=False, + auto_delete=False): self._connection_params = connection_params self._exchange_name = exchange_name + self._routing_key = routing_key self._queue_name = queue_name self._schema_cls = schema_cls self._durable = durable @@ -169,16 +171,17 @@ class RabbitMQBackendConsumer: self._connection = pika.BlockingConnection(self._connection_params) self._channel = self._connection.channel() - # Declare the fanout exchange (idempotent) + # Declare the topic exchange (idempotent, also done by producers) self._channel.exchange_declare( exchange=self._exchange_name, - exchange_type='fanout', + exchange_type='topic', durable=True, ) if self._exclusive: # Anonymous ephemeral queue (response/notify class). - # Per-consumer, broker assigns the name. + # These are per-consumer and must be created here — the + # broker assigns the name. result = self._channel.queue_declare( queue='', durable=False, @@ -186,21 +189,19 @@ class RabbitMQBackendConsumer: auto_delete=True, ) self._queue_name = result.method.queue - else: - # Named queue (flow/request class). - # Consumer owns its queue — declare and bind here. - self._channel.queue_declare( - queue=self._queue_name, - durable=self._durable, - exclusive=False, - auto_delete=False, - ) - # Bind queue to the fanout exchange - self._channel.queue_bind( - queue=self._queue_name, - exchange=self._exchange_name, - ) + self._channel.queue_bind( + queue=self._queue_name, + exchange=self._exchange_name, + routing_key=self._routing_key, + ) + else: + # Named queue (flow/request class). Queue must already + # exist — created by the flow service or ensure_queue. + # We just verify it exists and bind to consume. + self._channel.queue_declare( + queue=self._queue_name, passive=True, + ) self._channel.basic_qos(prefetch_count=1) @@ -317,7 +318,7 @@ class RabbitMQBackendConsumer: class RabbitMQBackend: - """RabbitMQ pub/sub backend using one fanout exchange per topic.""" + """RabbitMQ pub/sub backend using a topic exchange per topicspace.""" def __init__(self, host='localhost', port=5672, username='guest', password='guest', vhost='/'): @@ -330,23 +331,20 @@ class RabbitMQBackend: ) logger.info(f"RabbitMQ backend: {host}:{port} vhost={vhost}") - def _parse_topic(self, topic_id: str) -> tuple[str, str, bool]: + def _parse_queue_id(self, queue_id: str) -> tuple[str, str, str, bool]: """ - Parse topic identifier into exchange name and durability. + Parse queue identifier into exchange, routing key, and durability. Format: class:topicspace:topic - Returns: (exchange_name, class, durable) - - The exchange name encodes the full topic identity: - class:topicspace:topic → topicspace.class.topic + Returns: (exchange_name, routing_key, class, durable) """ - if ':' not in topic_id: - return f'tg.flow.{topic_id}', 'flow', True + if ':' not in queue_id: + return 'tg', queue_id, 'flow', False - parts = topic_id.split(':', 2) + parts = queue_id.split(':', 2) if len(parts) != 3: raise ValueError( - f"Invalid topic format: {topic_id}, " + f"Invalid queue format: {queue_id}, " f"expected class:topicspace:topic" ) @@ -358,28 +356,36 @@ class RabbitMQBackend: durable = False else: raise ValueError( - f"Invalid topic class: {cls}, " + f"Invalid queue class: {cls}, " f"expected flow, request, response, or notify" ) - exchange_name = f"{topicspace}.{cls}.{topic}" + # Exchange per topicspace, routing key includes class + exchange_name = topicspace + routing_key = f"{cls}.{topic}" - return exchange_name, cls, durable + return exchange_name, routing_key, cls, durable + + # Keep map_queue_name for backward compatibility with tests + def map_queue_name(self, queue_id: str) -> tuple[str, bool]: + exchange, routing_key, cls, durable = self._parse_queue_id(queue_id) + return f"{exchange}.{routing_key}", durable def create_producer(self, topic: str, schema: type, **options) -> BackendProducer: - exchange, cls, durable = self._parse_topic(topic) + exchange, routing_key, cls, durable = self._parse_queue_id(topic) logger.debug( - f"Creating producer: exchange={exchange}" + f"Creating producer: exchange={exchange}, " + f"routing_key={routing_key}" ) return RabbitMQBackendProducer( - self._connection_params, exchange, durable, + self._connection_params, exchange, routing_key, durable, ) def create_consumer(self, topic: str, subscription: str, schema: type, initial_position: str = 'latest', **options) -> BackendConsumer: - """Create a consumer with a queue bound to the topic's exchange. + """Create a consumer with a queue bound to the topic exchange. Behaviour is determined by the topic's class prefix: - flow: named durable queue, competing consumers (round-robin) @@ -387,7 +393,7 @@ class RabbitMQBackend: - response: anonymous ephemeral queue, per-subscriber (auto-delete) - notify: anonymous ephemeral queue, per-subscriber (auto-delete) """ - exchange, cls, durable = self._parse_topic(topic) + exchange, routing_key, cls, durable = self._parse_queue_id(topic) if cls in ('response', 'notify'): # Per-subscriber: anonymous queue, auto-deleted on disconnect @@ -397,33 +403,45 @@ class RabbitMQBackend: auto_delete = True else: # Shared: named queue, competing consumers - queue_name = f"{exchange}.{subscription}" + queue_name = f"{exchange}.{routing_key}.{subscription}" queue_durable = durable exclusive = False auto_delete = False logger.debug( f"Creating consumer: exchange={exchange}, " - f"queue={queue_name or '(anonymous)'}, cls={cls}" + f"routing_key={routing_key}, queue={queue_name or '(anonymous)'}, " + f"cls={cls}" ) return RabbitMQBackendConsumer( - self._connection_params, exchange, + self._connection_params, exchange, routing_key, queue_name, schema, queue_durable, exclusive, auto_delete, ) - def _create_topic_sync(self, exchange_name): - """Blocking exchange creation — run via asyncio.to_thread.""" + def _create_queue_sync(self, exchange, routing_key, queue_name, durable): + """Blocking queue creation — run via asyncio.to_thread.""" connection = None try: connection = pika.BlockingConnection(self._connection_params) channel = connection.channel() channel.exchange_declare( - exchange=exchange_name, - exchange_type='fanout', + exchange=exchange, + exchange_type='topic', durable=True, ) - logger.info(f"Created topic (exchange): {exchange_name}") + channel.queue_declare( + queue=queue_name, + durable=durable, + exclusive=False, + auto_delete=False, + ) + channel.queue_bind( + queue=queue_name, + exchange=exchange, + routing_key=routing_key, + ) + logger.info(f"Created queue: {queue_name}") finally: if connection and connection.is_open: try: @@ -431,30 +449,34 @@ class RabbitMQBackend: except Exception: pass - async def create_topic(self, topic: str) -> None: - """Create the fanout exchange for a logical topic. + async def create_queue(self, topic: str, subscription: str) -> None: + """Pre-create a named queue bound to the topic exchange. - Only applies to flow and request class topics. Response and - notify exchanges are created on demand by consumers. + Only applies to shared queues (flow/request class). Response and + notify queues are anonymous/auto-delete and created by consumers. """ - exchange, cls, durable = self._parse_topic(topic) + exchange, routing_key, cls, durable = self._parse_queue_id(topic) if cls in ('response', 'notify'): return - await asyncio.to_thread(self._create_topic_sync, exchange) + queue_name = f"{exchange}.{routing_key}.{subscription}" + await asyncio.to_thread( + self._create_queue_sync, exchange, routing_key, + queue_name, durable, + ) - def _delete_topic_sync(self, exchange_name): - """Blocking exchange deletion — run via asyncio.to_thread.""" + def _delete_queue_sync(self, queue_name): + """Blocking queue deletion — run via asyncio.to_thread.""" connection = None try: connection = pika.BlockingConnection(self._connection_params) channel = connection.channel() - channel.exchange_delete(exchange=exchange_name) - logger.info(f"Deleted topic (exchange): {exchange_name}") + channel.queue_delete(queue=queue_name) + logger.info(f"Deleted queue: {queue_name}") except Exception as e: - # Idempotent — exchange may already be gone - logger.debug(f"Exchange delete for {exchange_name}: {e}") + # Idempotent — queue may already be gone + logger.debug(f"Queue delete for {queue_name}: {e}") finally: if connection and connection.is_open: try: @@ -462,27 +484,31 @@ class RabbitMQBackend: except Exception: pass - async def delete_topic(self, topic: str) -> None: - """Delete a topic's fanout exchange. + async def delete_queue(self, topic: str, subscription: str) -> None: + """Delete a named queue and any messages it contains. - Consumer queues lose their binding and drain naturally. + Only applies to shared queues (flow/request class). Response and + notify queues are anonymous/auto-delete and managed by the broker. """ - exchange, cls, durable = self._parse_topic(topic) - await asyncio.to_thread(self._delete_topic_sync, exchange) + exchange, routing_key, cls, durable = self._parse_queue_id(topic) - def _topic_exists_sync(self, exchange_name): - """Blocking exchange existence check — run via asyncio.to_thread. + if cls in ('response', 'notify'): + return + + queue_name = f"{exchange}.{routing_key}.{subscription}" + await asyncio.to_thread(self._delete_queue_sync, queue_name) + + def _queue_exists_sync(self, queue_name): + """Blocking queue existence check — run via asyncio.to_thread. Uses passive=True which checks without creating.""" connection = None try: connection = pika.BlockingConnection(self._connection_params) channel = connection.channel() - channel.exchange_declare( - exchange=exchange_name, passive=True, - ) + channel.queue_declare(queue=queue_name, passive=True) return True except pika.exceptions.ChannelClosedByBroker: - # 404 NOT_FOUND — exchange does not exist + # 404 NOT_FOUND — queue does not exist return False finally: if connection and connection.is_open: @@ -491,25 +517,26 @@ class RabbitMQBackend: except Exception: pass - async def topic_exists(self, topic: str) -> bool: - """Check whether a topic's exchange exists. + async def queue_exists(self, topic: str, subscription: str) -> bool: + """Check whether a named queue exists. - Only applies to flow and request class topics. Response and - notify topics are ephemeral — always returns False. + Only applies to shared queues (flow/request class). Response and + notify queues are anonymous/ephemeral — always returns False. """ - exchange, cls, durable = self._parse_topic(topic) + exchange, routing_key, cls, durable = self._parse_queue_id(topic) if cls in ('response', 'notify'): return False + queue_name = f"{exchange}.{routing_key}.{subscription}" return await asyncio.to_thread( - self._topic_exists_sync, exchange + self._queue_exists_sync, queue_name ) - async def ensure_topic(self, topic: str) -> None: - """Ensure a topic exists, creating it if necessary.""" - if not await self.topic_exists(topic): - await self.create_topic(topic) + async def ensure_queue(self, topic: str, subscription: str) -> None: + """Ensure a queue exists, creating it if necessary.""" + if not await self.queue_exists(topic, subscription): + await self.create_queue(topic, subscription) def close(self) -> None: pass diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index fe44b852..75232315 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -124,7 +124,9 @@ class Processor(AsyncProcessor): async def start(self): - await self.pubsub.ensure_topic(self.config_request_topic) + await self.pubsub.ensure_queue( + self.config_request_topic, self.config_request_subscriber + ) await self.push() # Startup poke: empty types = everything await self.config_request_consumer.start() diff --git a/trustgraph-flow/trustgraph/cores/service.py b/trustgraph-flow/trustgraph/cores/service.py index 93017c30..400f96d1 100755 --- a/trustgraph-flow/trustgraph/cores/service.py +++ b/trustgraph-flow/trustgraph/cores/service.py @@ -119,7 +119,9 @@ class Processor(AsyncProcessor): async def start(self): - await self.pubsub.ensure_topic(self.knowledge_request_topic) + await self.pubsub.ensure_queue( + self.knowledge_request_topic, self.knowledge_request_subscriber + ) await super(Processor, self).start() await self.knowledge_request_consumer.start() await self.knowledge_response_producer.start() diff --git a/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py b/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py index e024ad40..bdb0e6e8 100644 --- a/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py @@ -429,16 +429,6 @@ class Processor(FlowProcessor): validated_triples = [] ontology_id = ontology_subset.ontology_id - # Gather entity types for domain/range validation - entity_types = {} - for triple_data in triples_response: - if isinstance(triple_data, dict): - s = triple_data.get('subject', '') - p = triple_data.get('predicate', '') - o = triple_data.get('object', '') - if s and p and o and (p == "rdf:type" or p == str(RDF_TYPE)): - entity_types[s] = o - for triple_data in triples_response: try: if isinstance(triple_data, dict): @@ -450,7 +440,7 @@ class Processor(FlowProcessor): continue # Validate against ontology - if self.is_valid_triple(subject, predicate, object_val, ontology_subset, entity_types): + if self.is_valid_triple(subject, predicate, object_val, ontology_subset): # Expand URIs before creating Value objects subject_uri = self.expand_uri(subject, ontology_subset, ontology_id) predicate_uri = self.expand_uri(predicate, ontology_subset, ontology_id) @@ -503,11 +493,8 @@ class Processor(FlowProcessor): return False def is_valid_triple(self, subject: str, predicate: str, object_val: str, - ontology_subset: OntologySubset, entity_types: dict = None) -> bool: + ontology_subset: OntologySubset) -> bool: """Validate triple against ontology constraints.""" - if entity_types is None: - entity_types = {} - # Special case for rdf:type if predicate == "rdf:type" or predicate == str(RDF_TYPE): # Check if object is a valid class @@ -524,45 +511,7 @@ class Processor(FlowProcessor): if not is_obj_prop and not is_dt_prop: return False # Unknown property - prop_def = ontology_subset.object_properties[predicate] if is_obj_prop else ontology_subset.datatype_properties[predicate] - if not isinstance(prop_def, dict): - prop_def = prop_def.__dict__ if hasattr(prop_def, '__dict__') else {} - - # Domain validation - expected_domain = prop_def.get('domain') - if expected_domain and subject in entity_types: - actual_domain = entity_types[subject] - if actual_domain != expected_domain: - is_subclass = False - curr_class = actual_domain - while curr_class in ontology_subset.classes: - cls_def = ontology_subset.classes[curr_class] - parent = cls_def.get('subclass_of') if isinstance(cls_def, dict) else None - if parent == expected_domain: - is_subclass = True - break - curr_class = parent - if not is_subclass: - return False - - # Range validation - if is_obj_prop: - expected_range = prop_def.get('range') - if expected_range and object_val in entity_types: - actual_range = entity_types[object_val] - if actual_range != expected_range: - is_subclass = False - curr_class = actual_range - while curr_class in ontology_subset.classes: - cls_def = ontology_subset.classes[curr_class] - parent = cls_def.get('subclass_of') if isinstance(cls_def, dict) else None - if parent == expected_range: - is_subclass = True - break - curr_class = parent - if not is_subclass: - return False - + # TODO: Add more sophisticated validation (domain/range checking) return True def expand_uri(self, value: str, ontology_subset: OntologySubset, ontology_id: str = "unknown") -> str: diff --git a/trustgraph-flow/trustgraph/flow/service/flow.py b/trustgraph-flow/trustgraph/flow/service/flow.py index b864faf9..477c6a2c 100644 --- a/trustgraph-flow/trustgraph/flow/service/flow.py +++ b/trustgraph-flow/trustgraph/flow/service/flow.py @@ -7,7 +7,7 @@ import logging # Module logger logger = logging.getLogger(__name__) -# Topic deletion retry settings +# Queue deletion retry settings DELETE_RETRIES = 5 DELETE_RETRY_DELAY = 2 # seconds @@ -215,11 +215,11 @@ class FlowConfig: return result - # Pre-create topic exchanges so the data path is wired + # Pre-create flow-level queues so the data path is wired # before processors receive their config and start connecting. - topics = self._collect_flow_topics(cls, repl_template_with_params) - for topic in topics: - await self.pubsub.create_topic(topic) + queues = self._collect_flow_queues(cls, repl_template_with_params) + for topic, subscription in queues: + await self.pubsub.create_queue(topic, subscription) # Build all processor config updates, then write in a single batch. updates = [] @@ -283,8 +283,8 @@ class FlowConfig: error = None, ) - async def ensure_existing_flow_topics(self): - """Ensure topics exist for all already-running flows. + async def ensure_existing_flow_queues(self): + """Ensure queues exist for all already-running flows. Called on startup to handle flows that were started before this version of the flow service was deployed, or before a restart. @@ -315,7 +315,7 @@ class FlowConfig: if blueprint_data is None: logger.warning( f"Blueprint '{blueprint_name}' not found for " - f"flow '{flow_id}', skipping topic creation" + f"flow '{flow_id}', skipping queue creation" ) continue @@ -333,63 +333,65 @@ class FlowConfig: ) return result - topics = self._collect_flow_topics(cls, repl_template) - for topic in topics: - await self.pubsub.ensure_topic(topic) + queues = self._collect_flow_queues(cls, repl_template) + for topic, subscription in queues: + await self.pubsub.ensure_queue(topic, subscription) logger.info( - f"Ensured topics for existing flow '{flow_id}'" + f"Ensured queues for existing flow '{flow_id}'" ) except Exception as e: logger.error( - f"Failed to ensure topics for flow '{flow_id}': {e}" + f"Failed to ensure queues for flow '{flow_id}': {e}" ) - def _collect_flow_topics(self, cls, repl_template): - """Collect unique topic identifiers from the blueprint. + def _collect_flow_queues(self, cls, repl_template): + """Collect (topic, subscription) pairs for all flow-level queues. - Iterates the blueprint's "flow" section and returns a - deduplicated set of resolved topic strings. The flow service - manages topic lifecycle (create/delete exchanges), not - individual consumer queues. + Iterates the blueprint's "flow" section and reads only the + "topics" dict from each processor entry. """ - topics = set() + queues = [] for k, v in cls["flow"].items(): + processor, variant = k.split(":", 1) + variant = repl_template(variant) + for spec_name, topic_template in v.get("topics", {}).items(): topic = repl_template(topic_template) - topics.add(topic) + subscription = f"{processor}--{variant}--{spec_name}" + queues.append((topic, subscription)) - return topics + return queues - async def _delete_topics(self, topics): - """Delete topics with retries. Best-effort — logs failures but + async def _delete_queues(self, queues): + """Delete queues with retries. Best-effort — logs failures but does not raise.""" for attempt in range(DELETE_RETRIES): remaining = [] - for topic in topics: + for topic, subscription in queues: try: - await self.pubsub.delete_topic(topic) + await self.pubsub.delete_queue(topic, subscription) except Exception as e: logger.warning( - f"Topic delete failed (attempt {attempt + 1}/" + f"Queue delete failed (attempt {attempt + 1}/" f"{DELETE_RETRIES}): {topic}: {e}" ) - remaining.append(topic) + remaining.append((topic, subscription)) if not remaining: return - topics = remaining + queues = remaining if attempt < DELETE_RETRIES - 1: await asyncio.sleep(DELETE_RETRY_DELAY) - for topic in topics: + for topic, subscription in queues: logger.error( - f"Failed to delete topic after {DELETE_RETRIES} " + f"Failed to delete queue after {DELETE_RETRIES} " f"attempts: {topic}" ) @@ -424,8 +426,8 @@ class FlowConfig: result = result.replace(f"{{{param_name}}}", str(param_value)) return result - # Collect topic identifiers before removing config - topics = self._collect_flow_topics(cls, repl_template) + # Collect queue identifiers before removing config + queues = self._collect_flow_queues(cls, repl_template) # Phase 1: Set status to "stopping" and remove processor config. # The config push tells processors to shut down their consumers. @@ -446,8 +448,8 @@ class FlowConfig: await self.config.delete_many(deletes) - # Phase 2: Delete topics with retries, then remove the flow record. - await self._delete_topics(topics) + # Phase 2: Delete queues with retries, then remove the flow record. + await self._delete_queues(queues) if msg.flow_id in await self.config.keys("flow"): await self.config.delete("flow", msg.flow_id) diff --git a/trustgraph-flow/trustgraph/flow/service/service.py b/trustgraph-flow/trustgraph/flow/service/service.py index e1997452..a3f2fb6b 100644 --- a/trustgraph-flow/trustgraph/flow/service/service.py +++ b/trustgraph-flow/trustgraph/flow/service/service.py @@ -101,9 +101,11 @@ class Processor(AsyncProcessor): async def start(self): - await self.pubsub.ensure_topic(self.flow_request_topic) + await self.pubsub.ensure_queue( + self.flow_request_topic, self.flow_request_subscriber + ) await self.config_client.start() - await self.flow.ensure_existing_flow_topics() + await self.flow.ensure_existing_flow_queues() await self.flow_request_consumer.start() async def on_flow_request(self, msg, consumer, flow): diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index ed005298..83f97bf3 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -263,8 +263,12 @@ class Processor(AsyncProcessor): async def start(self): - await self.pubsub.ensure_topic(self.librarian_request_topic) - await self.pubsub.ensure_topic(self.collection_request_topic) + await self.pubsub.ensure_queue( + self.librarian_request_topic, self.librarian_request_subscriber + ) + await self.pubsub.ensure_queue( + self.collection_request_topic, self.collection_request_subscriber + ) await super(Processor, self).start() await self.librarian_request_consumer.start() await self.librarian_response_producer.start()