Compare commits

..

No commits in common. "3505bfdd2521eeccc606e50ff00fa5ed60f9b286" and "9f84891fccf3473eb4afe7432f98a1be0312402f" have entirely different histories.

11 changed files with 232 additions and 291 deletions

View file

@ -231,52 +231,6 @@ class TestTripleValidation:
is_valid = extractor.is_valid_triple(subject, predicate, object_val, sample_ontology_subset)
assert is_valid == expected, f"Validation of {predicate} should be {expected}"
def test_validates_domain_correctly_with_entity_types(self, extractor, sample_ontology_subset):
"""Test domain validation correctly compares against extracted entity_types."""
subject = "my-recipe"
predicate = "produces"
object_val = "my-food"
# Proper domain for produces is Recipe
entity_types = {
"my-recipe": "Recipe",
"my-food": "Food"
}
is_valid = extractor.is_valid_triple(subject, predicate, object_val, sample_ontology_subset, entity_types)
assert is_valid, "Valid domain should be accepted"
# Invalid domain
entity_types_invalid = {
"my-recipe": "Ingredient",
"my-food": "Food"
}
is_invalid = extractor.is_valid_triple(subject, predicate, object_val, sample_ontology_subset, entity_types_invalid)
assert not is_invalid, "Invalid domain should be rejected"
def test_validates_range_correctly_with_entity_types(self, extractor, sample_ontology_subset):
"""Test range validation correctly compares against extracted entity_types."""
subject = "my-recipe"
predicate = "produces"
object_val = "my-food"
# Proper range for produces is Food
entity_types = {
"my-recipe": "Recipe",
"my-food": "Food"
}
is_valid = extractor.is_valid_triple(subject, predicate, object_val, sample_ontology_subset, entity_types)
assert is_valid, "Valid range should be accepted"
# Invalid range
entity_types_invalid = {
"my-recipe": "Recipe",
"my-food": "Recipe"
}
is_invalid = extractor.is_valid_triple(subject, predicate, object_val, sample_ontology_subset, entity_types_invalid)
assert not is_invalid, "Invalid range should be rejected"
class TestTripleParsing:
"""Test suite for parsing triples from LLM responses."""

View file

@ -1,5 +1,5 @@
"""
Unit tests for RabbitMQ backend topic parsing and factory dispatch.
Unit tests for RabbitMQ backend queue name mapping and factory dispatch.
Does not require a running RabbitMQ instance.
"""
@ -12,7 +12,7 @@ from trustgraph.base.rabbitmq_backend import RabbitMQBackend
from trustgraph.base.pubsub import get_pubsub, add_pubsub_args
class TestRabbitMQParseTopic:
class TestRabbitMQMapQueueName:
@pytest.fixture
def backend(self):
@ -20,48 +20,43 @@ class TestRabbitMQParseTopic:
return b
def test_flow_is_durable(self, backend):
exchange, cls, durable = backend._parse_topic('flow:tg:text-completion-request')
name, durable = backend.map_queue_name('flow:tg:text-completion-request')
assert durable is True
assert cls == 'flow'
assert exchange == 'tg.flow.text-completion-request'
assert name == 'tg.flow.text-completion-request'
def test_notify_is_not_durable(self, backend):
exchange, cls, durable = backend._parse_topic('notify:tg:config')
name, durable = backend.map_queue_name('notify:tg:config')
assert durable is False
assert cls == 'notify'
assert exchange == 'tg.notify.config'
assert name == 'tg.notify.config'
def test_request_is_not_durable(self, backend):
exchange, cls, durable = backend._parse_topic('request:tg:config')
name, durable = backend.map_queue_name('request:tg:config')
assert durable is False
assert cls == 'request'
assert exchange == 'tg.request.config'
assert name == 'tg.request.config'
def test_response_is_not_durable(self, backend):
exchange, cls, durable = backend._parse_topic('response:tg:librarian')
name, durable = backend.map_queue_name('response:tg:librarian')
assert durable is False
assert cls == 'response'
assert exchange == 'tg.response.librarian'
assert name == 'tg.response.librarian'
def test_custom_topicspace(self, backend):
exchange, cls, durable = backend._parse_topic('flow:prod:my-queue')
assert exchange == 'prod.flow.my-queue'
name, durable = backend.map_queue_name('flow:prod:my-queue')
assert name == 'prod.flow.my-queue'
assert durable is True
def test_no_colon_defaults_to_flow(self, backend):
exchange, cls, durable = backend._parse_topic('simple-queue')
assert exchange == 'tg.flow.simple-queue'
assert cls == 'flow'
assert durable is True
name, durable = backend.map_queue_name('simple-queue')
assert name == 'tg.simple-queue'
assert durable is False
def test_invalid_class_raises(self, backend):
with pytest.raises(ValueError, match="Invalid topic class"):
backend._parse_topic('unknown:tg:topic')
with pytest.raises(ValueError, match="Invalid queue class"):
backend.map_queue_name('unknown:tg:topic')
def test_topic_with_flow_suffix(self, backend):
"""Topic names with flow suffix (e.g. :default) are preserved."""
exchange, cls, durable = backend._parse_topic('request:tg:prompt:default')
assert exchange == 'tg.request.prompt:default'
def test_flow_with_flow_suffix(self, backend):
"""Queue names with flow suffix (e.g. :default) are preserved."""
name, durable = backend.map_queue_name('request:tg:prompt:default')
assert name == 'tg.request.prompt:default'
class TestGetPubsubRabbitMQ:

View file

@ -121,7 +121,7 @@ class PubSubBackend(Protocol):
Create a producer for a topic.
Args:
topic: Queue identifier in class:topicspace:topic format
topic: Generic topic format (qos/tenant/namespace/queue)
schema: Dataclass type for messages
**options: Backend-specific options (e.g., chunking_enabled)
@ -159,55 +159,59 @@ class PubSubBackend(Protocol):
"""
...
async def create_topic(self, topic: str) -> None:
async def create_queue(self, topic: str, subscription: str) -> None:
"""
Create the broker-side resources for a logical topic.
Pre-create a queue so it exists before any consumer connects.
For RabbitMQ this creates a fanout exchange. For Pulsar this is
a no-op (topics auto-create on first use).
The topic and subscription together identify the queue, mirroring
create_consumer where the queue name is derived from both.
Idempotent creating an already-existing topic succeeds silently.
Idempotent creating an already-existing queue succeeds silently.
Args:
topic: Topic identifier in class:topicspace:topic format
topic: Queue identifier in class:topicspace:topic format
subscription: Subscription/consumer group name
"""
...
async def delete_topic(self, topic: str) -> None:
async def delete_queue(self, topic: str, subscription: str) -> None:
"""
Delete a topic and discard any in-flight messages.
Delete a queue and any messages it contains.
For RabbitMQ this deletes the fanout exchange; consumer queues
lose their binding and drain naturally.
The topic and subscription together identify the queue, mirroring
create_consumer where the queue name is derived from both.
Idempotent deleting a non-existent topic succeeds silently.
Idempotent deleting a non-existent queue succeeds silently.
Args:
topic: Topic identifier in class:topicspace:topic format
topic: Queue identifier in class:topicspace:topic format
subscription: Subscription/consumer group name
"""
...
async def topic_exists(self, topic: str) -> bool:
async def queue_exists(self, topic: str, subscription: str) -> bool:
"""
Check whether a topic exists.
Check whether a queue exists.
Args:
topic: Topic identifier in class:topicspace:topic format
topic: Queue identifier in class:topicspace:topic format
subscription: Subscription/consumer group name
Returns:
True if the topic exists, False otherwise.
True if the queue exists, False otherwise.
"""
...
async def ensure_topic(self, topic: str) -> None:
async def ensure_queue(self, topic: str, subscription: str) -> None:
"""
Ensure a topic exists, creating it if necessary.
Ensure a queue exists, creating it if necessary.
Convenience wrapper checks existence, creates if missing.
Used by the flow service and system services on startup.
Used by system services on startup.
Args:
topic: Topic identifier in class:topicspace:topic format
topic: Queue identifier in class:topicspace:topic format
subscription: Subscription/consumer group name
"""
...

View file

@ -266,22 +266,22 @@ class PulsarBackend:
return PulsarBackendConsumer(pulsar_consumer, schema)
async def create_topic(self, topic: str) -> None:
async def create_queue(self, topic: str, subscription: str) -> None:
"""No-op — Pulsar auto-creates topics on first use.
TODO: Use admin REST API for explicit persistent topic creation."""
pass
async def delete_topic(self, topic: str) -> None:
async def delete_queue(self, topic: str, subscription: str) -> None:
"""No-op — to be replaced with admin REST API calls.
TODO: Delete persistent topic via admin API."""
TODO: Delete subscription and persistent topic via admin API."""
pass
async def topic_exists(self, topic: str) -> bool:
async def queue_exists(self, topic: str, subscription: str) -> bool:
"""Returns True — Pulsar auto-creates on subscribe.
TODO: Use admin REST API for actual existence check."""
return True
async def ensure_topic(self, topic: str) -> None:
async def ensure_queue(self, topic: str, subscription: str) -> None:
"""No-op — Pulsar auto-creates topics on first use.
TODO: Use admin REST API for explicit creation."""
pass

View file

@ -1,24 +1,22 @@
"""
RabbitMQ backend implementation for pub/sub abstraction.
Each logical topic maps to its own fanout exchange. The exchange name
encodes the full topic identity:
Uses a single topic exchange per topicspace. The logical queue name
becomes the routing key. Consumer behavior is determined by the
subscription name:
class:topicspace:topic exchange topicspace.class.topic
- Same subscription + same topic = shared queue (competing consumers)
- Different subscriptions = separate queues (broadcast / fan-out)
Producers publish to the exchange with an empty routing key.
Consumers declare and bind their own queues:
- flow / request: named durable/non-durable queue (competing consumers)
- response / notify: anonymous exclusive auto-delete queue (per-subscriber)
The flow service manages topic lifecycle (create/delete exchanges).
Consumers manage their own queue lifecycle (declare + bind on connect).
This mirrors Pulsar's subscription model using idiomatic RabbitMQ.
Architecture:
Producer --> [fanout exchange] --> [named queue] --> Consumer
--> [named queue] --> Consumer
--> [exclusive queue] --> Subscriber
Producer --> [tg exchange] --routing key--> [named queue] --> Consumer
--routing key--> [named queue] --> Consumer
--routing key--> [exclusive q] --> Subscriber
Uses basic_consume (push) instead of basic_get (polling) for
efficient message delivery.
"""
import asyncio
@ -60,16 +58,18 @@ class RabbitMQMessage:
class RabbitMQBackendProducer:
"""Publishes messages to a fanout exchange.
"""Publishes messages to a topic exchange with a routing key.
Uses thread-local connections so each thread gets its own
connection/channel. This avoids wire corruption from concurrent
threads writing to the same socket (pika is not thread-safe).
"""
def __init__(self, connection_params, exchange_name, durable):
def __init__(self, connection_params, exchange_name, routing_key,
durable):
self._connection_params = connection_params
self._exchange_name = exchange_name
self._routing_key = routing_key
self._durable = durable
self._local = threading.local()
@ -90,7 +90,7 @@ class RabbitMQBackendProducer:
chan = conn.channel()
chan.exchange_declare(
exchange=self._exchange_name,
exchange_type='fanout',
exchange_type='topic',
durable=True,
)
self._local.connection = conn
@ -113,7 +113,7 @@ class RabbitMQBackendProducer:
channel = self._get_channel()
channel.basic_publish(
exchange=self._exchange_name,
routing_key='',
routing_key=self._routing_key,
body=json_data.encode('utf-8'),
properties=amqp_properties,
)
@ -144,17 +144,19 @@ class RabbitMQBackendProducer:
class RabbitMQBackendConsumer:
"""Consumes from a queue bound to a fanout exchange.
"""Consumes from a queue bound to a topic exchange.
Uses basic_consume (push model) with messages delivered to an
internal thread-safe queue. process_data_events() drives both
message delivery and heartbeat processing.
"""
def __init__(self, connection_params, exchange_name, queue_name,
schema_cls, durable, exclusive=False, auto_delete=False):
def __init__(self, connection_params, exchange_name, routing_key,
queue_name, schema_cls, durable, exclusive=False,
auto_delete=False):
self._connection_params = connection_params
self._exchange_name = exchange_name
self._routing_key = routing_key
self._queue_name = queue_name
self._schema_cls = schema_cls
self._durable = durable
@ -169,16 +171,17 @@ class RabbitMQBackendConsumer:
self._connection = pika.BlockingConnection(self._connection_params)
self._channel = self._connection.channel()
# Declare the fanout exchange (idempotent)
# Declare the topic exchange (idempotent, also done by producers)
self._channel.exchange_declare(
exchange=self._exchange_name,
exchange_type='fanout',
exchange_type='topic',
durable=True,
)
if self._exclusive:
# Anonymous ephemeral queue (response/notify class).
# Per-consumer, broker assigns the name.
# These are per-consumer and must be created here — the
# broker assigns the name.
result = self._channel.queue_declare(
queue='',
durable=False,
@ -186,21 +189,19 @@ class RabbitMQBackendConsumer:
auto_delete=True,
)
self._queue_name = result.method.queue
else:
# Named queue (flow/request class).
# Consumer owns its queue — declare and bind here.
self._channel.queue_declare(
queue=self._queue_name,
durable=self._durable,
exclusive=False,
auto_delete=False,
)
# Bind queue to the fanout exchange
self._channel.queue_bind(
queue=self._queue_name,
exchange=self._exchange_name,
)
self._channel.queue_bind(
queue=self._queue_name,
exchange=self._exchange_name,
routing_key=self._routing_key,
)
else:
# Named queue (flow/request class). Queue must already
# exist — created by the flow service or ensure_queue.
# We just verify it exists and bind to consume.
self._channel.queue_declare(
queue=self._queue_name, passive=True,
)
self._channel.basic_qos(prefetch_count=1)
@ -317,7 +318,7 @@ class RabbitMQBackendConsumer:
class RabbitMQBackend:
"""RabbitMQ pub/sub backend using one fanout exchange per topic."""
"""RabbitMQ pub/sub backend using a topic exchange per topicspace."""
def __init__(self, host='localhost', port=5672, username='guest',
password='guest', vhost='/'):
@ -330,23 +331,20 @@ class RabbitMQBackend:
)
logger.info(f"RabbitMQ backend: {host}:{port} vhost={vhost}")
def _parse_topic(self, topic_id: str) -> tuple[str, str, bool]:
def _parse_queue_id(self, queue_id: str) -> tuple[str, str, str, bool]:
"""
Parse topic identifier into exchange name and durability.
Parse queue identifier into exchange, routing key, and durability.
Format: class:topicspace:topic
Returns: (exchange_name, class, durable)
The exchange name encodes the full topic identity:
class:topicspace:topic topicspace.class.topic
Returns: (exchange_name, routing_key, class, durable)
"""
if ':' not in topic_id:
return f'tg.flow.{topic_id}', 'flow', True
if ':' not in queue_id:
return 'tg', queue_id, 'flow', False
parts = topic_id.split(':', 2)
parts = queue_id.split(':', 2)
if len(parts) != 3:
raise ValueError(
f"Invalid topic format: {topic_id}, "
f"Invalid queue format: {queue_id}, "
f"expected class:topicspace:topic"
)
@ -358,28 +356,36 @@ class RabbitMQBackend:
durable = False
else:
raise ValueError(
f"Invalid topic class: {cls}, "
f"Invalid queue class: {cls}, "
f"expected flow, request, response, or notify"
)
exchange_name = f"{topicspace}.{cls}.{topic}"
# Exchange per topicspace, routing key includes class
exchange_name = topicspace
routing_key = f"{cls}.{topic}"
return exchange_name, cls, durable
return exchange_name, routing_key, cls, durable
# Keep map_queue_name for backward compatibility with tests
def map_queue_name(self, queue_id: str) -> tuple[str, bool]:
exchange, routing_key, cls, durable = self._parse_queue_id(queue_id)
return f"{exchange}.{routing_key}", durable
def create_producer(self, topic: str, schema: type,
**options) -> BackendProducer:
exchange, cls, durable = self._parse_topic(topic)
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
logger.debug(
f"Creating producer: exchange={exchange}"
f"Creating producer: exchange={exchange}, "
f"routing_key={routing_key}"
)
return RabbitMQBackendProducer(
self._connection_params, exchange, durable,
self._connection_params, exchange, routing_key, durable,
)
def create_consumer(self, topic: str, subscription: str, schema: type,
initial_position: str = 'latest',
**options) -> BackendConsumer:
"""Create a consumer with a queue bound to the topic's exchange.
"""Create a consumer with a queue bound to the topic exchange.
Behaviour is determined by the topic's class prefix:
- flow: named durable queue, competing consumers (round-robin)
@ -387,7 +393,7 @@ class RabbitMQBackend:
- response: anonymous ephemeral queue, per-subscriber (auto-delete)
- notify: anonymous ephemeral queue, per-subscriber (auto-delete)
"""
exchange, cls, durable = self._parse_topic(topic)
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
if cls in ('response', 'notify'):
# Per-subscriber: anonymous queue, auto-deleted on disconnect
@ -397,33 +403,45 @@ class RabbitMQBackend:
auto_delete = True
else:
# Shared: named queue, competing consumers
queue_name = f"{exchange}.{subscription}"
queue_name = f"{exchange}.{routing_key}.{subscription}"
queue_durable = durable
exclusive = False
auto_delete = False
logger.debug(
f"Creating consumer: exchange={exchange}, "
f"queue={queue_name or '(anonymous)'}, cls={cls}"
f"routing_key={routing_key}, queue={queue_name or '(anonymous)'}, "
f"cls={cls}"
)
return RabbitMQBackendConsumer(
self._connection_params, exchange,
self._connection_params, exchange, routing_key,
queue_name, schema, queue_durable, exclusive, auto_delete,
)
def _create_topic_sync(self, exchange_name):
"""Blocking exchange creation — run via asyncio.to_thread."""
def _create_queue_sync(self, exchange, routing_key, queue_name, durable):
"""Blocking queue creation — run via asyncio.to_thread."""
connection = None
try:
connection = pika.BlockingConnection(self._connection_params)
channel = connection.channel()
channel.exchange_declare(
exchange=exchange_name,
exchange_type='fanout',
exchange=exchange,
exchange_type='topic',
durable=True,
)
logger.info(f"Created topic (exchange): {exchange_name}")
channel.queue_declare(
queue=queue_name,
durable=durable,
exclusive=False,
auto_delete=False,
)
channel.queue_bind(
queue=queue_name,
exchange=exchange,
routing_key=routing_key,
)
logger.info(f"Created queue: {queue_name}")
finally:
if connection and connection.is_open:
try:
@ -431,30 +449,34 @@ class RabbitMQBackend:
except Exception:
pass
async def create_topic(self, topic: str) -> None:
"""Create the fanout exchange for a logical topic.
async def create_queue(self, topic: str, subscription: str) -> None:
"""Pre-create a named queue bound to the topic exchange.
Only applies to flow and request class topics. Response and
notify exchanges are created on demand by consumers.
Only applies to shared queues (flow/request class). Response and
notify queues are anonymous/auto-delete and created by consumers.
"""
exchange, cls, durable = self._parse_topic(topic)
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
if cls in ('response', 'notify'):
return
await asyncio.to_thread(self._create_topic_sync, exchange)
queue_name = f"{exchange}.{routing_key}.{subscription}"
await asyncio.to_thread(
self._create_queue_sync, exchange, routing_key,
queue_name, durable,
)
def _delete_topic_sync(self, exchange_name):
"""Blocking exchange deletion — run via asyncio.to_thread."""
def _delete_queue_sync(self, queue_name):
"""Blocking queue deletion — run via asyncio.to_thread."""
connection = None
try:
connection = pika.BlockingConnection(self._connection_params)
channel = connection.channel()
channel.exchange_delete(exchange=exchange_name)
logger.info(f"Deleted topic (exchange): {exchange_name}")
channel.queue_delete(queue=queue_name)
logger.info(f"Deleted queue: {queue_name}")
except Exception as e:
# Idempotent — exchange may already be gone
logger.debug(f"Exchange delete for {exchange_name}: {e}")
# Idempotent — queue may already be gone
logger.debug(f"Queue delete for {queue_name}: {e}")
finally:
if connection and connection.is_open:
try:
@ -462,27 +484,31 @@ class RabbitMQBackend:
except Exception:
pass
async def delete_topic(self, topic: str) -> None:
"""Delete a topic's fanout exchange.
async def delete_queue(self, topic: str, subscription: str) -> None:
"""Delete a named queue and any messages it contains.
Consumer queues lose their binding and drain naturally.
Only applies to shared queues (flow/request class). Response and
notify queues are anonymous/auto-delete and managed by the broker.
"""
exchange, cls, durable = self._parse_topic(topic)
await asyncio.to_thread(self._delete_topic_sync, exchange)
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
def _topic_exists_sync(self, exchange_name):
"""Blocking exchange existence check — run via asyncio.to_thread.
if cls in ('response', 'notify'):
return
queue_name = f"{exchange}.{routing_key}.{subscription}"
await asyncio.to_thread(self._delete_queue_sync, queue_name)
def _queue_exists_sync(self, queue_name):
"""Blocking queue existence check — run via asyncio.to_thread.
Uses passive=True which checks without creating."""
connection = None
try:
connection = pika.BlockingConnection(self._connection_params)
channel = connection.channel()
channel.exchange_declare(
exchange=exchange_name, passive=True,
)
channel.queue_declare(queue=queue_name, passive=True)
return True
except pika.exceptions.ChannelClosedByBroker:
# 404 NOT_FOUND — exchange does not exist
# 404 NOT_FOUND — queue does not exist
return False
finally:
if connection and connection.is_open:
@ -491,25 +517,26 @@ class RabbitMQBackend:
except Exception:
pass
async def topic_exists(self, topic: str) -> bool:
"""Check whether a topic's exchange exists.
async def queue_exists(self, topic: str, subscription: str) -> bool:
"""Check whether a named queue exists.
Only applies to flow and request class topics. Response and
notify topics are ephemeral always returns False.
Only applies to shared queues (flow/request class). Response and
notify queues are anonymous/ephemeral always returns False.
"""
exchange, cls, durable = self._parse_topic(topic)
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
if cls in ('response', 'notify'):
return False
queue_name = f"{exchange}.{routing_key}.{subscription}"
return await asyncio.to_thread(
self._topic_exists_sync, exchange
self._queue_exists_sync, queue_name
)
async def ensure_topic(self, topic: str) -> None:
"""Ensure a topic exists, creating it if necessary."""
if not await self.topic_exists(topic):
await self.create_topic(topic)
async def ensure_queue(self, topic: str, subscription: str) -> None:
"""Ensure a queue exists, creating it if necessary."""
if not await self.queue_exists(topic, subscription):
await self.create_queue(topic, subscription)
def close(self) -> None:
pass

View file

@ -124,7 +124,9 @@ class Processor(AsyncProcessor):
async def start(self):
await self.pubsub.ensure_topic(self.config_request_topic)
await self.pubsub.ensure_queue(
self.config_request_topic, self.config_request_subscriber
)
await self.push() # Startup poke: empty types = everything
await self.config_request_consumer.start()

View file

@ -119,7 +119,9 @@ class Processor(AsyncProcessor):
async def start(self):
await self.pubsub.ensure_topic(self.knowledge_request_topic)
await self.pubsub.ensure_queue(
self.knowledge_request_topic, self.knowledge_request_subscriber
)
await super(Processor, self).start()
await self.knowledge_request_consumer.start()
await self.knowledge_response_producer.start()

View file

@ -429,16 +429,6 @@ class Processor(FlowProcessor):
validated_triples = []
ontology_id = ontology_subset.ontology_id
# Gather entity types for domain/range validation
entity_types = {}
for triple_data in triples_response:
if isinstance(triple_data, dict):
s = triple_data.get('subject', '')
p = triple_data.get('predicate', '')
o = triple_data.get('object', '')
if s and p and o and (p == "rdf:type" or p == str(RDF_TYPE)):
entity_types[s] = o
for triple_data in triples_response:
try:
if isinstance(triple_data, dict):
@ -450,7 +440,7 @@ class Processor(FlowProcessor):
continue
# Validate against ontology
if self.is_valid_triple(subject, predicate, object_val, ontology_subset, entity_types):
if self.is_valid_triple(subject, predicate, object_val, ontology_subset):
# Expand URIs before creating Value objects
subject_uri = self.expand_uri(subject, ontology_subset, ontology_id)
predicate_uri = self.expand_uri(predicate, ontology_subset, ontology_id)
@ -503,11 +493,8 @@ class Processor(FlowProcessor):
return False
def is_valid_triple(self, subject: str, predicate: str, object_val: str,
ontology_subset: OntologySubset, entity_types: dict = None) -> bool:
ontology_subset: OntologySubset) -> bool:
"""Validate triple against ontology constraints."""
if entity_types is None:
entity_types = {}
# Special case for rdf:type
if predicate == "rdf:type" or predicate == str(RDF_TYPE):
# Check if object is a valid class
@ -524,45 +511,7 @@ class Processor(FlowProcessor):
if not is_obj_prop and not is_dt_prop:
return False # Unknown property
prop_def = ontology_subset.object_properties[predicate] if is_obj_prop else ontology_subset.datatype_properties[predicate]
if not isinstance(prop_def, dict):
prop_def = prop_def.__dict__ if hasattr(prop_def, '__dict__') else {}
# Domain validation
expected_domain = prop_def.get('domain')
if expected_domain and subject in entity_types:
actual_domain = entity_types[subject]
if actual_domain != expected_domain:
is_subclass = False
curr_class = actual_domain
while curr_class in ontology_subset.classes:
cls_def = ontology_subset.classes[curr_class]
parent = cls_def.get('subclass_of') if isinstance(cls_def, dict) else None
if parent == expected_domain:
is_subclass = True
break
curr_class = parent
if not is_subclass:
return False
# Range validation
if is_obj_prop:
expected_range = prop_def.get('range')
if expected_range and object_val in entity_types:
actual_range = entity_types[object_val]
if actual_range != expected_range:
is_subclass = False
curr_class = actual_range
while curr_class in ontology_subset.classes:
cls_def = ontology_subset.classes[curr_class]
parent = cls_def.get('subclass_of') if isinstance(cls_def, dict) else None
if parent == expected_range:
is_subclass = True
break
curr_class = parent
if not is_subclass:
return False
# TODO: Add more sophisticated validation (domain/range checking)
return True
def expand_uri(self, value: str, ontology_subset: OntologySubset, ontology_id: str = "unknown") -> str:

View file

@ -7,7 +7,7 @@ import logging
# Module logger
logger = logging.getLogger(__name__)
# Topic deletion retry settings
# Queue deletion retry settings
DELETE_RETRIES = 5
DELETE_RETRY_DELAY = 2 # seconds
@ -215,11 +215,11 @@ class FlowConfig:
return result
# Pre-create topic exchanges so the data path is wired
# Pre-create flow-level queues so the data path is wired
# before processors receive their config and start connecting.
topics = self._collect_flow_topics(cls, repl_template_with_params)
for topic in topics:
await self.pubsub.create_topic(topic)
queues = self._collect_flow_queues(cls, repl_template_with_params)
for topic, subscription in queues:
await self.pubsub.create_queue(topic, subscription)
# Build all processor config updates, then write in a single batch.
updates = []
@ -283,8 +283,8 @@ class FlowConfig:
error = None,
)
async def ensure_existing_flow_topics(self):
"""Ensure topics exist for all already-running flows.
async def ensure_existing_flow_queues(self):
"""Ensure queues exist for all already-running flows.
Called on startup to handle flows that were started before this
version of the flow service was deployed, or before a restart.
@ -315,7 +315,7 @@ class FlowConfig:
if blueprint_data is None:
logger.warning(
f"Blueprint '{blueprint_name}' not found for "
f"flow '{flow_id}', skipping topic creation"
f"flow '{flow_id}', skipping queue creation"
)
continue
@ -333,63 +333,65 @@ class FlowConfig:
)
return result
topics = self._collect_flow_topics(cls, repl_template)
for topic in topics:
await self.pubsub.ensure_topic(topic)
queues = self._collect_flow_queues(cls, repl_template)
for topic, subscription in queues:
await self.pubsub.ensure_queue(topic, subscription)
logger.info(
f"Ensured topics for existing flow '{flow_id}'"
f"Ensured queues for existing flow '{flow_id}'"
)
except Exception as e:
logger.error(
f"Failed to ensure topics for flow '{flow_id}': {e}"
f"Failed to ensure queues for flow '{flow_id}': {e}"
)
def _collect_flow_topics(self, cls, repl_template):
"""Collect unique topic identifiers from the blueprint.
def _collect_flow_queues(self, cls, repl_template):
"""Collect (topic, subscription) pairs for all flow-level queues.
Iterates the blueprint's "flow" section and returns a
deduplicated set of resolved topic strings. The flow service
manages topic lifecycle (create/delete exchanges), not
individual consumer queues.
Iterates the blueprint's "flow" section and reads only the
"topics" dict from each processor entry.
"""
topics = set()
queues = []
for k, v in cls["flow"].items():
processor, variant = k.split(":", 1)
variant = repl_template(variant)
for spec_name, topic_template in v.get("topics", {}).items():
topic = repl_template(topic_template)
topics.add(topic)
subscription = f"{processor}--{variant}--{spec_name}"
queues.append((topic, subscription))
return topics
return queues
async def _delete_topics(self, topics):
"""Delete topics with retries. Best-effort — logs failures but
async def _delete_queues(self, queues):
"""Delete queues with retries. Best-effort — logs failures but
does not raise."""
for attempt in range(DELETE_RETRIES):
remaining = []
for topic in topics:
for topic, subscription in queues:
try:
await self.pubsub.delete_topic(topic)
await self.pubsub.delete_queue(topic, subscription)
except Exception as e:
logger.warning(
f"Topic delete failed (attempt {attempt + 1}/"
f"Queue delete failed (attempt {attempt + 1}/"
f"{DELETE_RETRIES}): {topic}: {e}"
)
remaining.append(topic)
remaining.append((topic, subscription))
if not remaining:
return
topics = remaining
queues = remaining
if attempt < DELETE_RETRIES - 1:
await asyncio.sleep(DELETE_RETRY_DELAY)
for topic in topics:
for topic, subscription in queues:
logger.error(
f"Failed to delete topic after {DELETE_RETRIES} "
f"Failed to delete queue after {DELETE_RETRIES} "
f"attempts: {topic}"
)
@ -424,8 +426,8 @@ class FlowConfig:
result = result.replace(f"{{{param_name}}}", str(param_value))
return result
# Collect topic identifiers before removing config
topics = self._collect_flow_topics(cls, repl_template)
# Collect queue identifiers before removing config
queues = self._collect_flow_queues(cls, repl_template)
# Phase 1: Set status to "stopping" and remove processor config.
# The config push tells processors to shut down their consumers.
@ -446,8 +448,8 @@ class FlowConfig:
await self.config.delete_many(deletes)
# Phase 2: Delete topics with retries, then remove the flow record.
await self._delete_topics(topics)
# Phase 2: Delete queues with retries, then remove the flow record.
await self._delete_queues(queues)
if msg.flow_id in await self.config.keys("flow"):
await self.config.delete("flow", msg.flow_id)

View file

@ -101,9 +101,11 @@ class Processor(AsyncProcessor):
async def start(self):
await self.pubsub.ensure_topic(self.flow_request_topic)
await self.pubsub.ensure_queue(
self.flow_request_topic, self.flow_request_subscriber
)
await self.config_client.start()
await self.flow.ensure_existing_flow_topics()
await self.flow.ensure_existing_flow_queues()
await self.flow_request_consumer.start()
async def on_flow_request(self, msg, consumer, flow):

View file

@ -263,8 +263,12 @@ class Processor(AsyncProcessor):
async def start(self):
await self.pubsub.ensure_topic(self.librarian_request_topic)
await self.pubsub.ensure_topic(self.collection_request_topic)
await self.pubsub.ensure_queue(
self.librarian_request_topic, self.librarian_request_subscriber
)
await self.pubsub.ensure_queue(
self.collection_request_topic, self.collection_request_subscriber
)
await super(Processor, self).start()
await self.librarian_request_consumer.start()
await self.librarian_response_producer.start()