mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-06-30 00:49:38 +02:00
Compare commits
2 commits
9f84891fcc
...
3505bfdd25
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3505bfdd25 | ||
|
|
391b9076f3 |
11 changed files with 290 additions and 231 deletions
|
|
@ -231,6 +231,52 @@ class TestTripleValidation:
|
|||
is_valid = extractor.is_valid_triple(subject, predicate, object_val, sample_ontology_subset)
|
||||
assert is_valid == expected, f"Validation of {predicate} should be {expected}"
|
||||
|
||||
def test_validates_domain_correctly_with_entity_types(self, extractor, sample_ontology_subset):
|
||||
"""Test domain validation correctly compares against extracted entity_types."""
|
||||
subject = "my-recipe"
|
||||
predicate = "produces"
|
||||
object_val = "my-food"
|
||||
|
||||
# Proper domain for produces is Recipe
|
||||
entity_types = {
|
||||
"my-recipe": "Recipe",
|
||||
"my-food": "Food"
|
||||
}
|
||||
|
||||
is_valid = extractor.is_valid_triple(subject, predicate, object_val, sample_ontology_subset, entity_types)
|
||||
assert is_valid, "Valid domain should be accepted"
|
||||
|
||||
# Invalid domain
|
||||
entity_types_invalid = {
|
||||
"my-recipe": "Ingredient",
|
||||
"my-food": "Food"
|
||||
}
|
||||
is_invalid = extractor.is_valid_triple(subject, predicate, object_val, sample_ontology_subset, entity_types_invalid)
|
||||
assert not is_invalid, "Invalid domain should be rejected"
|
||||
|
||||
def test_validates_range_correctly_with_entity_types(self, extractor, sample_ontology_subset):
|
||||
"""Test range validation correctly compares against extracted entity_types."""
|
||||
subject = "my-recipe"
|
||||
predicate = "produces"
|
||||
object_val = "my-food"
|
||||
|
||||
# Proper range for produces is Food
|
||||
entity_types = {
|
||||
"my-recipe": "Recipe",
|
||||
"my-food": "Food"
|
||||
}
|
||||
|
||||
is_valid = extractor.is_valid_triple(subject, predicate, object_val, sample_ontology_subset, entity_types)
|
||||
assert is_valid, "Valid range should be accepted"
|
||||
|
||||
# Invalid range
|
||||
entity_types_invalid = {
|
||||
"my-recipe": "Recipe",
|
||||
"my-food": "Recipe"
|
||||
}
|
||||
is_invalid = extractor.is_valid_triple(subject, predicate, object_val, sample_ontology_subset, entity_types_invalid)
|
||||
assert not is_invalid, "Invalid range should be rejected"
|
||||
|
||||
|
||||
class TestTripleParsing:
|
||||
"""Test suite for parsing triples from LLM responses."""
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
"""
|
||||
Unit tests for RabbitMQ backend — queue name mapping and factory dispatch.
|
||||
Unit tests for RabbitMQ backend — topic parsing and factory dispatch.
|
||||
Does not require a running RabbitMQ instance.
|
||||
"""
|
||||
|
||||
|
|
@ -12,7 +12,7 @@ from trustgraph.base.rabbitmq_backend import RabbitMQBackend
|
|||
from trustgraph.base.pubsub import get_pubsub, add_pubsub_args
|
||||
|
||||
|
||||
class TestRabbitMQMapQueueName:
|
||||
class TestRabbitMQParseTopic:
|
||||
|
||||
@pytest.fixture
|
||||
def backend(self):
|
||||
|
|
@ -20,43 +20,48 @@ class TestRabbitMQMapQueueName:
|
|||
return b
|
||||
|
||||
def test_flow_is_durable(self, backend):
|
||||
name, durable = backend.map_queue_name('flow:tg:text-completion-request')
|
||||
exchange, cls, durable = backend._parse_topic('flow:tg:text-completion-request')
|
||||
assert durable is True
|
||||
assert name == 'tg.flow.text-completion-request'
|
||||
assert cls == 'flow'
|
||||
assert exchange == 'tg.flow.text-completion-request'
|
||||
|
||||
def test_notify_is_not_durable(self, backend):
|
||||
name, durable = backend.map_queue_name('notify:tg:config')
|
||||
exchange, cls, durable = backend._parse_topic('notify:tg:config')
|
||||
assert durable is False
|
||||
assert name == 'tg.notify.config'
|
||||
assert cls == 'notify'
|
||||
assert exchange == 'tg.notify.config'
|
||||
|
||||
def test_request_is_not_durable(self, backend):
|
||||
name, durable = backend.map_queue_name('request:tg:config')
|
||||
exchange, cls, durable = backend._parse_topic('request:tg:config')
|
||||
assert durable is False
|
||||
assert name == 'tg.request.config'
|
||||
assert cls == 'request'
|
||||
assert exchange == 'tg.request.config'
|
||||
|
||||
def test_response_is_not_durable(self, backend):
|
||||
name, durable = backend.map_queue_name('response:tg:librarian')
|
||||
exchange, cls, durable = backend._parse_topic('response:tg:librarian')
|
||||
assert durable is False
|
||||
assert name == 'tg.response.librarian'
|
||||
assert cls == 'response'
|
||||
assert exchange == 'tg.response.librarian'
|
||||
|
||||
def test_custom_topicspace(self, backend):
|
||||
name, durable = backend.map_queue_name('flow:prod:my-queue')
|
||||
assert name == 'prod.flow.my-queue'
|
||||
exchange, cls, durable = backend._parse_topic('flow:prod:my-queue')
|
||||
assert exchange == 'prod.flow.my-queue'
|
||||
assert durable is True
|
||||
|
||||
def test_no_colon_defaults_to_flow(self, backend):
|
||||
name, durable = backend.map_queue_name('simple-queue')
|
||||
assert name == 'tg.simple-queue'
|
||||
assert durable is False
|
||||
exchange, cls, durable = backend._parse_topic('simple-queue')
|
||||
assert exchange == 'tg.flow.simple-queue'
|
||||
assert cls == 'flow'
|
||||
assert durable is True
|
||||
|
||||
def test_invalid_class_raises(self, backend):
|
||||
with pytest.raises(ValueError, match="Invalid queue class"):
|
||||
backend.map_queue_name('unknown:tg:topic')
|
||||
with pytest.raises(ValueError, match="Invalid topic class"):
|
||||
backend._parse_topic('unknown:tg:topic')
|
||||
|
||||
def test_flow_with_flow_suffix(self, backend):
|
||||
"""Queue names with flow suffix (e.g. :default) are preserved."""
|
||||
name, durable = backend.map_queue_name('request:tg:prompt:default')
|
||||
assert name == 'tg.request.prompt:default'
|
||||
def test_topic_with_flow_suffix(self, backend):
|
||||
"""Topic names with flow suffix (e.g. :default) are preserved."""
|
||||
exchange, cls, durable = backend._parse_topic('request:tg:prompt:default')
|
||||
assert exchange == 'tg.request.prompt:default'
|
||||
|
||||
|
||||
class TestGetPubsubRabbitMQ:
|
||||
|
|
|
|||
|
|
@ -121,7 +121,7 @@ class PubSubBackend(Protocol):
|
|||
Create a producer for a topic.
|
||||
|
||||
Args:
|
||||
topic: Generic topic format (qos/tenant/namespace/queue)
|
||||
topic: Queue identifier in class:topicspace:topic format
|
||||
schema: Dataclass type for messages
|
||||
**options: Backend-specific options (e.g., chunking_enabled)
|
||||
|
||||
|
|
@ -159,59 +159,55 @@ class PubSubBackend(Protocol):
|
|||
"""
|
||||
...
|
||||
|
||||
async def create_queue(self, topic: str, subscription: str) -> None:
|
||||
async def create_topic(self, topic: str) -> None:
|
||||
"""
|
||||
Pre-create a queue so it exists before any consumer connects.
|
||||
Create the broker-side resources for a logical topic.
|
||||
|
||||
The topic and subscription together identify the queue, mirroring
|
||||
create_consumer where the queue name is derived from both.
|
||||
For RabbitMQ this creates a fanout exchange. For Pulsar this is
|
||||
a no-op (topics auto-create on first use).
|
||||
|
||||
Idempotent — creating an already-existing queue succeeds silently.
|
||||
Idempotent — creating an already-existing topic succeeds silently.
|
||||
|
||||
Args:
|
||||
topic: Queue identifier in class:topicspace:topic format
|
||||
subscription: Subscription/consumer group name
|
||||
topic: Topic identifier in class:topicspace:topic format
|
||||
"""
|
||||
...
|
||||
|
||||
async def delete_queue(self, topic: str, subscription: str) -> None:
|
||||
async def delete_topic(self, topic: str) -> None:
|
||||
"""
|
||||
Delete a queue and any messages it contains.
|
||||
Delete a topic and discard any in-flight messages.
|
||||
|
||||
The topic and subscription together identify the queue, mirroring
|
||||
create_consumer where the queue name is derived from both.
|
||||
For RabbitMQ this deletes the fanout exchange; consumer queues
|
||||
lose their binding and drain naturally.
|
||||
|
||||
Idempotent — deleting a non-existent queue succeeds silently.
|
||||
Idempotent — deleting a non-existent topic succeeds silently.
|
||||
|
||||
Args:
|
||||
topic: Queue identifier in class:topicspace:topic format
|
||||
subscription: Subscription/consumer group name
|
||||
topic: Topic identifier in class:topicspace:topic format
|
||||
"""
|
||||
...
|
||||
|
||||
async def queue_exists(self, topic: str, subscription: str) -> bool:
|
||||
async def topic_exists(self, topic: str) -> bool:
|
||||
"""
|
||||
Check whether a queue exists.
|
||||
Check whether a topic exists.
|
||||
|
||||
Args:
|
||||
topic: Queue identifier in class:topicspace:topic format
|
||||
subscription: Subscription/consumer group name
|
||||
topic: Topic identifier in class:topicspace:topic format
|
||||
|
||||
Returns:
|
||||
True if the queue exists, False otherwise.
|
||||
True if the topic exists, False otherwise.
|
||||
"""
|
||||
...
|
||||
|
||||
async def ensure_queue(self, topic: str, subscription: str) -> None:
|
||||
async def ensure_topic(self, topic: str) -> None:
|
||||
"""
|
||||
Ensure a queue exists, creating it if necessary.
|
||||
Ensure a topic exists, creating it if necessary.
|
||||
|
||||
Convenience wrapper — checks existence, creates if missing.
|
||||
Used by system services on startup.
|
||||
Used by the flow service and system services on startup.
|
||||
|
||||
Args:
|
||||
topic: Queue identifier in class:topicspace:topic format
|
||||
subscription: Subscription/consumer group name
|
||||
topic: Topic identifier in class:topicspace:topic format
|
||||
"""
|
||||
...
|
||||
|
||||
|
|
|
|||
|
|
@ -266,22 +266,22 @@ class PulsarBackend:
|
|||
|
||||
return PulsarBackendConsumer(pulsar_consumer, schema)
|
||||
|
||||
async def create_queue(self, topic: str, subscription: str) -> None:
|
||||
async def create_topic(self, topic: str) -> None:
|
||||
"""No-op — Pulsar auto-creates topics on first use.
|
||||
TODO: Use admin REST API for explicit persistent topic creation."""
|
||||
pass
|
||||
|
||||
async def delete_queue(self, topic: str, subscription: str) -> None:
|
||||
async def delete_topic(self, topic: str) -> None:
|
||||
"""No-op — to be replaced with admin REST API calls.
|
||||
TODO: Delete subscription and persistent topic via admin API."""
|
||||
TODO: Delete persistent topic via admin API."""
|
||||
pass
|
||||
|
||||
async def queue_exists(self, topic: str, subscription: str) -> bool:
|
||||
async def topic_exists(self, topic: str) -> bool:
|
||||
"""Returns True — Pulsar auto-creates on subscribe.
|
||||
TODO: Use admin REST API for actual existence check."""
|
||||
return True
|
||||
|
||||
async def ensure_queue(self, topic: str, subscription: str) -> None:
|
||||
async def ensure_topic(self, topic: str) -> None:
|
||||
"""No-op — Pulsar auto-creates topics on first use.
|
||||
TODO: Use admin REST API for explicit creation."""
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -1,22 +1,24 @@
|
|||
"""
|
||||
RabbitMQ backend implementation for pub/sub abstraction.
|
||||
|
||||
Uses a single topic exchange per topicspace. The logical queue name
|
||||
becomes the routing key. Consumer behavior is determined by the
|
||||
subscription name:
|
||||
Each logical topic maps to its own fanout exchange. The exchange name
|
||||
encodes the full topic identity:
|
||||
|
||||
- Same subscription + same topic = shared queue (competing consumers)
|
||||
- Different subscriptions = separate queues (broadcast / fan-out)
|
||||
class:topicspace:topic → exchange topicspace.class.topic
|
||||
|
||||
This mirrors Pulsar's subscription model using idiomatic RabbitMQ.
|
||||
Producers publish to the exchange with an empty routing key.
|
||||
Consumers declare and bind their own queues:
|
||||
|
||||
- flow / request: named durable/non-durable queue (competing consumers)
|
||||
- response / notify: anonymous exclusive auto-delete queue (per-subscriber)
|
||||
|
||||
The flow service manages topic lifecycle (create/delete exchanges).
|
||||
Consumers manage their own queue lifecycle (declare + bind on connect).
|
||||
|
||||
Architecture:
|
||||
Producer --> [tg exchange] --routing key--> [named queue] --> Consumer
|
||||
--routing key--> [named queue] --> Consumer
|
||||
--routing key--> [exclusive q] --> Subscriber
|
||||
|
||||
Uses basic_consume (push) instead of basic_get (polling) for
|
||||
efficient message delivery.
|
||||
Producer --> [fanout exchange] --> [named queue] --> Consumer
|
||||
--> [named queue] --> Consumer
|
||||
--> [exclusive queue] --> Subscriber
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
|
|
@ -58,18 +60,16 @@ class RabbitMQMessage:
|
|||
|
||||
|
||||
class RabbitMQBackendProducer:
|
||||
"""Publishes messages to a topic exchange with a routing key.
|
||||
"""Publishes messages to a fanout exchange.
|
||||
|
||||
Uses thread-local connections so each thread gets its own
|
||||
connection/channel. This avoids wire corruption from concurrent
|
||||
threads writing to the same socket (pika is not thread-safe).
|
||||
"""
|
||||
|
||||
def __init__(self, connection_params, exchange_name, routing_key,
|
||||
durable):
|
||||
def __init__(self, connection_params, exchange_name, durable):
|
||||
self._connection_params = connection_params
|
||||
self._exchange_name = exchange_name
|
||||
self._routing_key = routing_key
|
||||
self._durable = durable
|
||||
self._local = threading.local()
|
||||
|
||||
|
|
@ -90,7 +90,7 @@ class RabbitMQBackendProducer:
|
|||
chan = conn.channel()
|
||||
chan.exchange_declare(
|
||||
exchange=self._exchange_name,
|
||||
exchange_type='topic',
|
||||
exchange_type='fanout',
|
||||
durable=True,
|
||||
)
|
||||
self._local.connection = conn
|
||||
|
|
@ -113,7 +113,7 @@ class RabbitMQBackendProducer:
|
|||
channel = self._get_channel()
|
||||
channel.basic_publish(
|
||||
exchange=self._exchange_name,
|
||||
routing_key=self._routing_key,
|
||||
routing_key='',
|
||||
body=json_data.encode('utf-8'),
|
||||
properties=amqp_properties,
|
||||
)
|
||||
|
|
@ -144,19 +144,17 @@ class RabbitMQBackendProducer:
|
|||
|
||||
|
||||
class RabbitMQBackendConsumer:
|
||||
"""Consumes from a queue bound to a topic exchange.
|
||||
"""Consumes from a queue bound to a fanout exchange.
|
||||
|
||||
Uses basic_consume (push model) with messages delivered to an
|
||||
internal thread-safe queue. process_data_events() drives both
|
||||
message delivery and heartbeat processing.
|
||||
"""
|
||||
|
||||
def __init__(self, connection_params, exchange_name, routing_key,
|
||||
queue_name, schema_cls, durable, exclusive=False,
|
||||
auto_delete=False):
|
||||
def __init__(self, connection_params, exchange_name, queue_name,
|
||||
schema_cls, durable, exclusive=False, auto_delete=False):
|
||||
self._connection_params = connection_params
|
||||
self._exchange_name = exchange_name
|
||||
self._routing_key = routing_key
|
||||
self._queue_name = queue_name
|
||||
self._schema_cls = schema_cls
|
||||
self._durable = durable
|
||||
|
|
@ -171,17 +169,16 @@ class RabbitMQBackendConsumer:
|
|||
self._connection = pika.BlockingConnection(self._connection_params)
|
||||
self._channel = self._connection.channel()
|
||||
|
||||
# Declare the topic exchange (idempotent, also done by producers)
|
||||
# Declare the fanout exchange (idempotent)
|
||||
self._channel.exchange_declare(
|
||||
exchange=self._exchange_name,
|
||||
exchange_type='topic',
|
||||
exchange_type='fanout',
|
||||
durable=True,
|
||||
)
|
||||
|
||||
if self._exclusive:
|
||||
# Anonymous ephemeral queue (response/notify class).
|
||||
# These are per-consumer and must be created here — the
|
||||
# broker assigns the name.
|
||||
# Per-consumer, broker assigns the name.
|
||||
result = self._channel.queue_declare(
|
||||
queue='',
|
||||
durable=False,
|
||||
|
|
@ -189,20 +186,22 @@ class RabbitMQBackendConsumer:
|
|||
auto_delete=True,
|
||||
)
|
||||
self._queue_name = result.method.queue
|
||||
|
||||
self._channel.queue_bind(
|
||||
queue=self._queue_name,
|
||||
exchange=self._exchange_name,
|
||||
routing_key=self._routing_key,
|
||||
)
|
||||
else:
|
||||
# Named queue (flow/request class). Queue must already
|
||||
# exist — created by the flow service or ensure_queue.
|
||||
# We just verify it exists and bind to consume.
|
||||
# Named queue (flow/request class).
|
||||
# Consumer owns its queue — declare and bind here.
|
||||
self._channel.queue_declare(
|
||||
queue=self._queue_name, passive=True,
|
||||
queue=self._queue_name,
|
||||
durable=self._durable,
|
||||
exclusive=False,
|
||||
auto_delete=False,
|
||||
)
|
||||
|
||||
# Bind queue to the fanout exchange
|
||||
self._channel.queue_bind(
|
||||
queue=self._queue_name,
|
||||
exchange=self._exchange_name,
|
||||
)
|
||||
|
||||
self._channel.basic_qos(prefetch_count=1)
|
||||
|
||||
# Register push-based consumer
|
||||
|
|
@ -318,7 +317,7 @@ class RabbitMQBackendConsumer:
|
|||
|
||||
|
||||
class RabbitMQBackend:
|
||||
"""RabbitMQ pub/sub backend using a topic exchange per topicspace."""
|
||||
"""RabbitMQ pub/sub backend using one fanout exchange per topic."""
|
||||
|
||||
def __init__(self, host='localhost', port=5672, username='guest',
|
||||
password='guest', vhost='/'):
|
||||
|
|
@ -331,20 +330,23 @@ class RabbitMQBackend:
|
|||
)
|
||||
logger.info(f"RabbitMQ backend: {host}:{port} vhost={vhost}")
|
||||
|
||||
def _parse_queue_id(self, queue_id: str) -> tuple[str, str, str, bool]:
|
||||
def _parse_topic(self, topic_id: str) -> tuple[str, str, bool]:
|
||||
"""
|
||||
Parse queue identifier into exchange, routing key, and durability.
|
||||
Parse topic identifier into exchange name and durability.
|
||||
|
||||
Format: class:topicspace:topic
|
||||
Returns: (exchange_name, routing_key, class, durable)
|
||||
"""
|
||||
if ':' not in queue_id:
|
||||
return 'tg', queue_id, 'flow', False
|
||||
Returns: (exchange_name, class, durable)
|
||||
|
||||
parts = queue_id.split(':', 2)
|
||||
The exchange name encodes the full topic identity:
|
||||
class:topicspace:topic → topicspace.class.topic
|
||||
"""
|
||||
if ':' not in topic_id:
|
||||
return f'tg.flow.{topic_id}', 'flow', True
|
||||
|
||||
parts = topic_id.split(':', 2)
|
||||
if len(parts) != 3:
|
||||
raise ValueError(
|
||||
f"Invalid queue format: {queue_id}, "
|
||||
f"Invalid topic format: {topic_id}, "
|
||||
f"expected class:topicspace:topic"
|
||||
)
|
||||
|
||||
|
|
@ -356,36 +358,28 @@ class RabbitMQBackend:
|
|||
durable = False
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Invalid queue class: {cls}, "
|
||||
f"Invalid topic class: {cls}, "
|
||||
f"expected flow, request, response, or notify"
|
||||
)
|
||||
|
||||
# Exchange per topicspace, routing key includes class
|
||||
exchange_name = topicspace
|
||||
routing_key = f"{cls}.{topic}"
|
||||
exchange_name = f"{topicspace}.{cls}.{topic}"
|
||||
|
||||
return exchange_name, routing_key, cls, durable
|
||||
|
||||
# Keep map_queue_name for backward compatibility with tests
|
||||
def map_queue_name(self, queue_id: str) -> tuple[str, bool]:
|
||||
exchange, routing_key, cls, durable = self._parse_queue_id(queue_id)
|
||||
return f"{exchange}.{routing_key}", durable
|
||||
return exchange_name, cls, durable
|
||||
|
||||
def create_producer(self, topic: str, schema: type,
|
||||
**options) -> BackendProducer:
|
||||
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
|
||||
exchange, cls, durable = self._parse_topic(topic)
|
||||
logger.debug(
|
||||
f"Creating producer: exchange={exchange}, "
|
||||
f"routing_key={routing_key}"
|
||||
f"Creating producer: exchange={exchange}"
|
||||
)
|
||||
return RabbitMQBackendProducer(
|
||||
self._connection_params, exchange, routing_key, durable,
|
||||
self._connection_params, exchange, durable,
|
||||
)
|
||||
|
||||
def create_consumer(self, topic: str, subscription: str, schema: type,
|
||||
initial_position: str = 'latest',
|
||||
**options) -> BackendConsumer:
|
||||
"""Create a consumer with a queue bound to the topic exchange.
|
||||
"""Create a consumer with a queue bound to the topic's exchange.
|
||||
|
||||
Behaviour is determined by the topic's class prefix:
|
||||
- flow: named durable queue, competing consumers (round-robin)
|
||||
|
|
@ -393,7 +387,7 @@ class RabbitMQBackend:
|
|||
- response: anonymous ephemeral queue, per-subscriber (auto-delete)
|
||||
- notify: anonymous ephemeral queue, per-subscriber (auto-delete)
|
||||
"""
|
||||
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
|
||||
exchange, cls, durable = self._parse_topic(topic)
|
||||
|
||||
if cls in ('response', 'notify'):
|
||||
# Per-subscriber: anonymous queue, auto-deleted on disconnect
|
||||
|
|
@ -403,45 +397,33 @@ class RabbitMQBackend:
|
|||
auto_delete = True
|
||||
else:
|
||||
# Shared: named queue, competing consumers
|
||||
queue_name = f"{exchange}.{routing_key}.{subscription}"
|
||||
queue_name = f"{exchange}.{subscription}"
|
||||
queue_durable = durable
|
||||
exclusive = False
|
||||
auto_delete = False
|
||||
|
||||
logger.debug(
|
||||
f"Creating consumer: exchange={exchange}, "
|
||||
f"routing_key={routing_key}, queue={queue_name or '(anonymous)'}, "
|
||||
f"cls={cls}"
|
||||
f"queue={queue_name or '(anonymous)'}, cls={cls}"
|
||||
)
|
||||
|
||||
return RabbitMQBackendConsumer(
|
||||
self._connection_params, exchange, routing_key,
|
||||
self._connection_params, exchange,
|
||||
queue_name, schema, queue_durable, exclusive, auto_delete,
|
||||
)
|
||||
|
||||
def _create_queue_sync(self, exchange, routing_key, queue_name, durable):
|
||||
"""Blocking queue creation — run via asyncio.to_thread."""
|
||||
def _create_topic_sync(self, exchange_name):
|
||||
"""Blocking exchange creation — run via asyncio.to_thread."""
|
||||
connection = None
|
||||
try:
|
||||
connection = pika.BlockingConnection(self._connection_params)
|
||||
channel = connection.channel()
|
||||
channel.exchange_declare(
|
||||
exchange=exchange,
|
||||
exchange_type='topic',
|
||||
exchange=exchange_name,
|
||||
exchange_type='fanout',
|
||||
durable=True,
|
||||
)
|
||||
channel.queue_declare(
|
||||
queue=queue_name,
|
||||
durable=durable,
|
||||
exclusive=False,
|
||||
auto_delete=False,
|
||||
)
|
||||
channel.queue_bind(
|
||||
queue=queue_name,
|
||||
exchange=exchange,
|
||||
routing_key=routing_key,
|
||||
)
|
||||
logger.info(f"Created queue: {queue_name}")
|
||||
logger.info(f"Created topic (exchange): {exchange_name}")
|
||||
finally:
|
||||
if connection and connection.is_open:
|
||||
try:
|
||||
|
|
@ -449,34 +431,30 @@ class RabbitMQBackend:
|
|||
except Exception:
|
||||
pass
|
||||
|
||||
async def create_queue(self, topic: str, subscription: str) -> None:
|
||||
"""Pre-create a named queue bound to the topic exchange.
|
||||
async def create_topic(self, topic: str) -> None:
|
||||
"""Create the fanout exchange for a logical topic.
|
||||
|
||||
Only applies to shared queues (flow/request class). Response and
|
||||
notify queues are anonymous/auto-delete and created by consumers.
|
||||
Only applies to flow and request class topics. Response and
|
||||
notify exchanges are created on demand by consumers.
|
||||
"""
|
||||
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
|
||||
exchange, cls, durable = self._parse_topic(topic)
|
||||
|
||||
if cls in ('response', 'notify'):
|
||||
return
|
||||
|
||||
queue_name = f"{exchange}.{routing_key}.{subscription}"
|
||||
await asyncio.to_thread(
|
||||
self._create_queue_sync, exchange, routing_key,
|
||||
queue_name, durable,
|
||||
)
|
||||
await asyncio.to_thread(self._create_topic_sync, exchange)
|
||||
|
||||
def _delete_queue_sync(self, queue_name):
|
||||
"""Blocking queue deletion — run via asyncio.to_thread."""
|
||||
def _delete_topic_sync(self, exchange_name):
|
||||
"""Blocking exchange deletion — run via asyncio.to_thread."""
|
||||
connection = None
|
||||
try:
|
||||
connection = pika.BlockingConnection(self._connection_params)
|
||||
channel = connection.channel()
|
||||
channel.queue_delete(queue=queue_name)
|
||||
logger.info(f"Deleted queue: {queue_name}")
|
||||
channel.exchange_delete(exchange=exchange_name)
|
||||
logger.info(f"Deleted topic (exchange): {exchange_name}")
|
||||
except Exception as e:
|
||||
# Idempotent — queue may already be gone
|
||||
logger.debug(f"Queue delete for {queue_name}: {e}")
|
||||
# Idempotent — exchange may already be gone
|
||||
logger.debug(f"Exchange delete for {exchange_name}: {e}")
|
||||
finally:
|
||||
if connection and connection.is_open:
|
||||
try:
|
||||
|
|
@ -484,31 +462,27 @@ class RabbitMQBackend:
|
|||
except Exception:
|
||||
pass
|
||||
|
||||
async def delete_queue(self, topic: str, subscription: str) -> None:
|
||||
"""Delete a named queue and any messages it contains.
|
||||
async def delete_topic(self, topic: str) -> None:
|
||||
"""Delete a topic's fanout exchange.
|
||||
|
||||
Only applies to shared queues (flow/request class). Response and
|
||||
notify queues are anonymous/auto-delete and managed by the broker.
|
||||
Consumer queues lose their binding and drain naturally.
|
||||
"""
|
||||
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
|
||||
exchange, cls, durable = self._parse_topic(topic)
|
||||
await asyncio.to_thread(self._delete_topic_sync, exchange)
|
||||
|
||||
if cls in ('response', 'notify'):
|
||||
return
|
||||
|
||||
queue_name = f"{exchange}.{routing_key}.{subscription}"
|
||||
await asyncio.to_thread(self._delete_queue_sync, queue_name)
|
||||
|
||||
def _queue_exists_sync(self, queue_name):
|
||||
"""Blocking queue existence check — run via asyncio.to_thread.
|
||||
def _topic_exists_sync(self, exchange_name):
|
||||
"""Blocking exchange existence check — run via asyncio.to_thread.
|
||||
Uses passive=True which checks without creating."""
|
||||
connection = None
|
||||
try:
|
||||
connection = pika.BlockingConnection(self._connection_params)
|
||||
channel = connection.channel()
|
||||
channel.queue_declare(queue=queue_name, passive=True)
|
||||
channel.exchange_declare(
|
||||
exchange=exchange_name, passive=True,
|
||||
)
|
||||
return True
|
||||
except pika.exceptions.ChannelClosedByBroker:
|
||||
# 404 NOT_FOUND — queue does not exist
|
||||
# 404 NOT_FOUND — exchange does not exist
|
||||
return False
|
||||
finally:
|
||||
if connection and connection.is_open:
|
||||
|
|
@ -517,26 +491,25 @@ class RabbitMQBackend:
|
|||
except Exception:
|
||||
pass
|
||||
|
||||
async def queue_exists(self, topic: str, subscription: str) -> bool:
|
||||
"""Check whether a named queue exists.
|
||||
async def topic_exists(self, topic: str) -> bool:
|
||||
"""Check whether a topic's exchange exists.
|
||||
|
||||
Only applies to shared queues (flow/request class). Response and
|
||||
notify queues are anonymous/ephemeral — always returns False.
|
||||
Only applies to flow and request class topics. Response and
|
||||
notify topics are ephemeral — always returns False.
|
||||
"""
|
||||
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
|
||||
exchange, cls, durable = self._parse_topic(topic)
|
||||
|
||||
if cls in ('response', 'notify'):
|
||||
return False
|
||||
|
||||
queue_name = f"{exchange}.{routing_key}.{subscription}"
|
||||
return await asyncio.to_thread(
|
||||
self._queue_exists_sync, queue_name
|
||||
self._topic_exists_sync, exchange
|
||||
)
|
||||
|
||||
async def ensure_queue(self, topic: str, subscription: str) -> None:
|
||||
"""Ensure a queue exists, creating it if necessary."""
|
||||
if not await self.queue_exists(topic, subscription):
|
||||
await self.create_queue(topic, subscription)
|
||||
async def ensure_topic(self, topic: str) -> None:
|
||||
"""Ensure a topic exists, creating it if necessary."""
|
||||
if not await self.topic_exists(topic):
|
||||
await self.create_topic(topic)
|
||||
|
||||
def close(self) -> None:
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -124,9 +124,7 @@ class Processor(AsyncProcessor):
|
|||
|
||||
async def start(self):
|
||||
|
||||
await self.pubsub.ensure_queue(
|
||||
self.config_request_topic, self.config_request_subscriber
|
||||
)
|
||||
await self.pubsub.ensure_topic(self.config_request_topic)
|
||||
await self.push() # Startup poke: empty types = everything
|
||||
await self.config_request_consumer.start()
|
||||
|
||||
|
|
|
|||
|
|
@ -119,9 +119,7 @@ class Processor(AsyncProcessor):
|
|||
|
||||
async def start(self):
|
||||
|
||||
await self.pubsub.ensure_queue(
|
||||
self.knowledge_request_topic, self.knowledge_request_subscriber
|
||||
)
|
||||
await self.pubsub.ensure_topic(self.knowledge_request_topic)
|
||||
await super(Processor, self).start()
|
||||
await self.knowledge_request_consumer.start()
|
||||
await self.knowledge_response_producer.start()
|
||||
|
|
|
|||
|
|
@ -429,6 +429,16 @@ class Processor(FlowProcessor):
|
|||
validated_triples = []
|
||||
ontology_id = ontology_subset.ontology_id
|
||||
|
||||
# Gather entity types for domain/range validation
|
||||
entity_types = {}
|
||||
for triple_data in triples_response:
|
||||
if isinstance(triple_data, dict):
|
||||
s = triple_data.get('subject', '')
|
||||
p = triple_data.get('predicate', '')
|
||||
o = triple_data.get('object', '')
|
||||
if s and p and o and (p == "rdf:type" or p == str(RDF_TYPE)):
|
||||
entity_types[s] = o
|
||||
|
||||
for triple_data in triples_response:
|
||||
try:
|
||||
if isinstance(triple_data, dict):
|
||||
|
|
@ -440,7 +450,7 @@ class Processor(FlowProcessor):
|
|||
continue
|
||||
|
||||
# Validate against ontology
|
||||
if self.is_valid_triple(subject, predicate, object_val, ontology_subset):
|
||||
if self.is_valid_triple(subject, predicate, object_val, ontology_subset, entity_types):
|
||||
# Expand URIs before creating Value objects
|
||||
subject_uri = self.expand_uri(subject, ontology_subset, ontology_id)
|
||||
predicate_uri = self.expand_uri(predicate, ontology_subset, ontology_id)
|
||||
|
|
@ -493,8 +503,11 @@ class Processor(FlowProcessor):
|
|||
return False
|
||||
|
||||
def is_valid_triple(self, subject: str, predicate: str, object_val: str,
|
||||
ontology_subset: OntologySubset) -> bool:
|
||||
ontology_subset: OntologySubset, entity_types: dict = None) -> bool:
|
||||
"""Validate triple against ontology constraints."""
|
||||
if entity_types is None:
|
||||
entity_types = {}
|
||||
|
||||
# Special case for rdf:type
|
||||
if predicate == "rdf:type" or predicate == str(RDF_TYPE):
|
||||
# Check if object is a valid class
|
||||
|
|
@ -511,7 +524,45 @@ class Processor(FlowProcessor):
|
|||
if not is_obj_prop and not is_dt_prop:
|
||||
return False # Unknown property
|
||||
|
||||
# TODO: Add more sophisticated validation (domain/range checking)
|
||||
prop_def = ontology_subset.object_properties[predicate] if is_obj_prop else ontology_subset.datatype_properties[predicate]
|
||||
if not isinstance(prop_def, dict):
|
||||
prop_def = prop_def.__dict__ if hasattr(prop_def, '__dict__') else {}
|
||||
|
||||
# Domain validation
|
||||
expected_domain = prop_def.get('domain')
|
||||
if expected_domain and subject in entity_types:
|
||||
actual_domain = entity_types[subject]
|
||||
if actual_domain != expected_domain:
|
||||
is_subclass = False
|
||||
curr_class = actual_domain
|
||||
while curr_class in ontology_subset.classes:
|
||||
cls_def = ontology_subset.classes[curr_class]
|
||||
parent = cls_def.get('subclass_of') if isinstance(cls_def, dict) else None
|
||||
if parent == expected_domain:
|
||||
is_subclass = True
|
||||
break
|
||||
curr_class = parent
|
||||
if not is_subclass:
|
||||
return False
|
||||
|
||||
# Range validation
|
||||
if is_obj_prop:
|
||||
expected_range = prop_def.get('range')
|
||||
if expected_range and object_val in entity_types:
|
||||
actual_range = entity_types[object_val]
|
||||
if actual_range != expected_range:
|
||||
is_subclass = False
|
||||
curr_class = actual_range
|
||||
while curr_class in ontology_subset.classes:
|
||||
cls_def = ontology_subset.classes[curr_class]
|
||||
parent = cls_def.get('subclass_of') if isinstance(cls_def, dict) else None
|
||||
if parent == expected_range:
|
||||
is_subclass = True
|
||||
break
|
||||
curr_class = parent
|
||||
if not is_subclass:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def expand_uri(self, value: str, ontology_subset: OntologySubset, ontology_id: str = "unknown") -> str:
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import logging
|
|||
# Module logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Queue deletion retry settings
|
||||
# Topic deletion retry settings
|
||||
DELETE_RETRIES = 5
|
||||
DELETE_RETRY_DELAY = 2 # seconds
|
||||
|
||||
|
|
@ -215,11 +215,11 @@ class FlowConfig:
|
|||
|
||||
return result
|
||||
|
||||
# Pre-create flow-level queues so the data path is wired
|
||||
# Pre-create topic exchanges so the data path is wired
|
||||
# before processors receive their config and start connecting.
|
||||
queues = self._collect_flow_queues(cls, repl_template_with_params)
|
||||
for topic, subscription in queues:
|
||||
await self.pubsub.create_queue(topic, subscription)
|
||||
topics = self._collect_flow_topics(cls, repl_template_with_params)
|
||||
for topic in topics:
|
||||
await self.pubsub.create_topic(topic)
|
||||
|
||||
# Build all processor config updates, then write in a single batch.
|
||||
updates = []
|
||||
|
|
@ -283,8 +283,8 @@ class FlowConfig:
|
|||
error = None,
|
||||
)
|
||||
|
||||
async def ensure_existing_flow_queues(self):
|
||||
"""Ensure queues exist for all already-running flows.
|
||||
async def ensure_existing_flow_topics(self):
|
||||
"""Ensure topics exist for all already-running flows.
|
||||
|
||||
Called on startup to handle flows that were started before this
|
||||
version of the flow service was deployed, or before a restart.
|
||||
|
|
@ -315,7 +315,7 @@ class FlowConfig:
|
|||
if blueprint_data is None:
|
||||
logger.warning(
|
||||
f"Blueprint '{blueprint_name}' not found for "
|
||||
f"flow '{flow_id}', skipping queue creation"
|
||||
f"flow '{flow_id}', skipping topic creation"
|
||||
)
|
||||
continue
|
||||
|
||||
|
|
@ -333,65 +333,63 @@ class FlowConfig:
|
|||
)
|
||||
return result
|
||||
|
||||
queues = self._collect_flow_queues(cls, repl_template)
|
||||
for topic, subscription in queues:
|
||||
await self.pubsub.ensure_queue(topic, subscription)
|
||||
topics = self._collect_flow_topics(cls, repl_template)
|
||||
for topic in topics:
|
||||
await self.pubsub.ensure_topic(topic)
|
||||
|
||||
logger.info(
|
||||
f"Ensured queues for existing flow '{flow_id}'"
|
||||
f"Ensured topics for existing flow '{flow_id}'"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to ensure queues for flow '{flow_id}': {e}"
|
||||
f"Failed to ensure topics for flow '{flow_id}': {e}"
|
||||
)
|
||||
|
||||
def _collect_flow_queues(self, cls, repl_template):
|
||||
"""Collect (topic, subscription) pairs for all flow-level queues.
|
||||
def _collect_flow_topics(self, cls, repl_template):
|
||||
"""Collect unique topic identifiers from the blueprint.
|
||||
|
||||
Iterates the blueprint's "flow" section and reads only the
|
||||
"topics" dict from each processor entry.
|
||||
Iterates the blueprint's "flow" section and returns a
|
||||
deduplicated set of resolved topic strings. The flow service
|
||||
manages topic lifecycle (create/delete exchanges), not
|
||||
individual consumer queues.
|
||||
"""
|
||||
queues = []
|
||||
topics = set()
|
||||
|
||||
for k, v in cls["flow"].items():
|
||||
processor, variant = k.split(":", 1)
|
||||
variant = repl_template(variant)
|
||||
|
||||
for spec_name, topic_template in v.get("topics", {}).items():
|
||||
topic = repl_template(topic_template)
|
||||
subscription = f"{processor}--{variant}--{spec_name}"
|
||||
queues.append((topic, subscription))
|
||||
topics.add(topic)
|
||||
|
||||
return queues
|
||||
return topics
|
||||
|
||||
async def _delete_queues(self, queues):
|
||||
"""Delete queues with retries. Best-effort — logs failures but
|
||||
async def _delete_topics(self, topics):
|
||||
"""Delete topics with retries. Best-effort — logs failures but
|
||||
does not raise."""
|
||||
for attempt in range(DELETE_RETRIES):
|
||||
remaining = []
|
||||
|
||||
for topic, subscription in queues:
|
||||
for topic in topics:
|
||||
try:
|
||||
await self.pubsub.delete_queue(topic, subscription)
|
||||
await self.pubsub.delete_topic(topic)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Queue delete failed (attempt {attempt + 1}/"
|
||||
f"Topic delete failed (attempt {attempt + 1}/"
|
||||
f"{DELETE_RETRIES}): {topic}: {e}"
|
||||
)
|
||||
remaining.append((topic, subscription))
|
||||
remaining.append(topic)
|
||||
|
||||
if not remaining:
|
||||
return
|
||||
|
||||
queues = remaining
|
||||
topics = remaining
|
||||
|
||||
if attempt < DELETE_RETRIES - 1:
|
||||
await asyncio.sleep(DELETE_RETRY_DELAY)
|
||||
|
||||
for topic, subscription in queues:
|
||||
for topic in topics:
|
||||
logger.error(
|
||||
f"Failed to delete queue after {DELETE_RETRIES} "
|
||||
f"Failed to delete topic after {DELETE_RETRIES} "
|
||||
f"attempts: {topic}"
|
||||
)
|
||||
|
||||
|
|
@ -426,8 +424,8 @@ class FlowConfig:
|
|||
result = result.replace(f"{{{param_name}}}", str(param_value))
|
||||
return result
|
||||
|
||||
# Collect queue identifiers before removing config
|
||||
queues = self._collect_flow_queues(cls, repl_template)
|
||||
# Collect topic identifiers before removing config
|
||||
topics = self._collect_flow_topics(cls, repl_template)
|
||||
|
||||
# Phase 1: Set status to "stopping" and remove processor config.
|
||||
# The config push tells processors to shut down their consumers.
|
||||
|
|
@ -448,8 +446,8 @@ class FlowConfig:
|
|||
|
||||
await self.config.delete_many(deletes)
|
||||
|
||||
# Phase 2: Delete queues with retries, then remove the flow record.
|
||||
await self._delete_queues(queues)
|
||||
# Phase 2: Delete topics with retries, then remove the flow record.
|
||||
await self._delete_topics(topics)
|
||||
|
||||
if msg.flow_id in await self.config.keys("flow"):
|
||||
await self.config.delete("flow", msg.flow_id)
|
||||
|
|
|
|||
|
|
@ -101,11 +101,9 @@ class Processor(AsyncProcessor):
|
|||
|
||||
async def start(self):
|
||||
|
||||
await self.pubsub.ensure_queue(
|
||||
self.flow_request_topic, self.flow_request_subscriber
|
||||
)
|
||||
await self.pubsub.ensure_topic(self.flow_request_topic)
|
||||
await self.config_client.start()
|
||||
await self.flow.ensure_existing_flow_queues()
|
||||
await self.flow.ensure_existing_flow_topics()
|
||||
await self.flow_request_consumer.start()
|
||||
|
||||
async def on_flow_request(self, msg, consumer, flow):
|
||||
|
|
|
|||
|
|
@ -263,12 +263,8 @@ class Processor(AsyncProcessor):
|
|||
|
||||
async def start(self):
|
||||
|
||||
await self.pubsub.ensure_queue(
|
||||
self.librarian_request_topic, self.librarian_request_subscriber
|
||||
)
|
||||
await self.pubsub.ensure_queue(
|
||||
self.collection_request_topic, self.collection_request_subscriber
|
||||
)
|
||||
await self.pubsub.ensure_topic(self.librarian_request_topic)
|
||||
await self.pubsub.ensure_topic(self.collection_request_topic)
|
||||
await super(Processor, self).start()
|
||||
await self.librarian_request_consumer.start()
|
||||
await self.librarian_response_producer.start()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue