From 39f6a8b94099c1769238f35b7c2f710b5a108d7e Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Sat, 6 Dec 2025 14:54:47 +0000 Subject: [PATCH] Fix/queue configurations (#585) * Fix config-svc startup dupe CLI args * Fix missing params on collection service * Fix collection management handling --- trustgraph-base/trustgraph/api/collection.py | 8 +-- trustgraph-base/trustgraph/api/types.py | 2 - .../messaging/translators/collection.py | 14 +--- .../trustgraph/cli/list_collections.py | 8 +-- .../trustgraph/cli/set_collection.py | 1 - .../trustgraph/config/service/service.py | 6 +- .../gateway/dispatch/collection_management.py | 12 +++- .../librarian/collection_manager.py | 71 +++++++++++-------- 8 files changed, 59 insertions(+), 63 deletions(-) diff --git a/trustgraph-base/trustgraph/api/collection.py b/trustgraph-base/trustgraph/api/collection.py index 0e1abeaf..5a1f0850 100644 --- a/trustgraph-base/trustgraph/api/collection.py +++ b/trustgraph-base/trustgraph/api/collection.py @@ -41,9 +41,7 @@ class Collection: collection = v["collection"], name = v["name"], description = v["description"], - tags = v["tags"], - created_at = v["created_at"], - updated_at = v["updated_at"] + tags = v["tags"] ) for v in collections ] @@ -76,9 +74,7 @@ class Collection: collection = v["collection"], name = v["name"], description = v["description"], - tags = v["tags"], - created_at = v["created_at"], - updated_at = v["updated_at"] + tags = v["tags"] ) return None except Exception as e: diff --git a/trustgraph-base/trustgraph/api/types.py b/trustgraph-base/trustgraph/api/types.py index bba566a6..a8608853 100644 --- a/trustgraph-base/trustgraph/api/types.py +++ b/trustgraph-base/trustgraph/api/types.py @@ -49,8 +49,6 @@ class CollectionMetadata: name : str description : str tags : List[str] - created_at : str - updated_at : str # Streaming chunk types diff --git a/trustgraph-base/trustgraph/messaging/translators/collection.py b/trustgraph-base/trustgraph/messaging/translators/collection.py index 38ac813b..22c82828 100644 --- a/trustgraph-base/trustgraph/messaging/translators/collection.py +++ b/trustgraph-base/trustgraph/messaging/translators/collection.py @@ -15,8 +15,6 @@ class CollectionManagementRequestTranslator(MessageTranslator): name=data.get("name"), description=data.get("description"), tags=data.get("tags"), - created_at=data.get("created_at"), - updated_at=data.get("updated_at"), tag_filter=data.get("tag_filter"), limit=data.get("limit") ) @@ -38,10 +36,6 @@ class CollectionManagementRequestTranslator(MessageTranslator): result["description"] = obj.description if obj.tags is not None: result["tags"] = list(obj.tags) - if obj.created_at is not None: - result["created_at"] = obj.created_at - if obj.updated_at is not None: - result["updated_at"] = obj.updated_at if obj.tag_filter is not None: result["tag_filter"] = list(obj.tag_filter) if obj.limit is not None: @@ -73,9 +67,7 @@ class CollectionManagementResponseTranslator(MessageTranslator): collection=coll_data.get("collection"), name=coll_data.get("name"), description=coll_data.get("description"), - tags=coll_data.get("tags"), - created_at=coll_data.get("created_at"), - updated_at=coll_data.get("updated_at") + tags=coll_data.get("tags", []) )) return CollectionManagementResponse( @@ -104,9 +96,7 @@ class CollectionManagementResponseTranslator(MessageTranslator): "collection": coll.collection, "name": coll.name, "description": coll.description, - "tags": list(coll.tags) if coll.tags else [], - "created_at": coll.created_at, - "updated_at": coll.updated_at + "tags": list(coll.tags) if coll.tags else [] }) print("RESULT IS", result, flush=True) diff --git a/trustgraph-cli/trustgraph/cli/list_collections.py b/trustgraph-cli/trustgraph/cli/list_collections.py index 56929e93..4086f471 100644 --- a/trustgraph-cli/trustgraph/cli/list_collections.py +++ b/trustgraph-cli/trustgraph/cli/list_collections.py @@ -28,19 +28,17 @@ def list_collections(url, user, tag_filter): collection.collection, collection.name, collection.description, - ", ".join(collection.tags), - collection.created_at, - collection.updated_at + ", ".join(collection.tags) ]) - headers = ["Collection", "Name", "Description", "Tags", "Created", "Updated"] + headers = ["Collection", "Name", "Description", "Tags"] print(tabulate.tabulate( table, headers=headers, tablefmt="pretty", stralign="left", - maxcolwidths=[20, 30, 50, 30, 19, 19], + maxcolwidths=[20, 30, 50, 30], )) def main(): diff --git a/trustgraph-cli/trustgraph/cli/set_collection.py b/trustgraph-cli/trustgraph/cli/set_collection.py index 1a39eb08..dd4148ea 100644 --- a/trustgraph-cli/trustgraph/cli/set_collection.py +++ b/trustgraph-cli/trustgraph/cli/set_collection.py @@ -31,7 +31,6 @@ def set_collection(url, user, collection, name, description, tags, token=None): table.append(("Name", result.name)) table.append(("Description", result.description)) table.append(("Tags", ", ".join(result.tags))) - table.append(("Updated", result.updated_at)) print(tabulate.tabulate( table, diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index 0a8ee8a1..414ad847 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -270,11 +270,7 @@ class Processor(AsyncProcessor): help=f'Config response queue {default_config_response_queue}', ) - parser.add_argument( - '--config-push-queue', - default=default_config_push_queue, - help=f'Config push queue (default: {default_config_push_queue})' - ) + # Note: --config-push-queue is already added by AsyncProcessor.add_args() parser.add_argument( '--flow-request-queue', diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/collection_management.py b/trustgraph-flow/trustgraph/gateway/dispatch/collection_management.py index f2755ae8..9773ad4c 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/collection_management.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/collection_management.py @@ -5,14 +5,20 @@ from ... messaging import TranslatorRegistry from . requestor import ServiceRequestor class CollectionManagementRequestor(ServiceRequestor): - def __init__(self, pulsar_client, consumer, subscriber, timeout=120): + def __init__(self, pulsar_client, consumer, subscriber, timeout=120, + request_queue=None, response_queue=None): + + if request_queue is None: + request_queue = collection_request_queue + if response_queue is None: + response_queue = collection_response_queue super(CollectionManagementRequestor, self).__init__( pulsar_client=pulsar_client, consumer_name = consumer, subscription = subscriber, - request_queue=collection_request_queue, - response_queue=collection_response_queue, + request_queue=request_queue, + response_queue=response_queue, request_schema=CollectionManagementRequest, response_schema=CollectionManagementResponse, timeout=timeout, diff --git a/trustgraph-flow/trustgraph/librarian/collection_manager.py b/trustgraph-flow/trustgraph/librarian/collection_manager.py index d3e1b369..fffb0358 100644 --- a/trustgraph-flow/trustgraph/librarian/collection_manager.py +++ b/trustgraph-flow/trustgraph/librarian/collection_manager.py @@ -11,12 +11,22 @@ from typing import Dict, Any, List, Optional from .. schema import CollectionManagementRequest, CollectionManagementResponse, Error from .. schema import CollectionMetadata -from .. schema import ConfigRequest, ConfigResponse +from .. schema import ConfigRequest, ConfigResponse, ConfigKey, ConfigValue from .. exceptions import RequestError # Module logger logger = logging.getLogger(__name__) +def metadata_to_dict(metadata: CollectionMetadata) -> dict: + """Convert CollectionMetadata to dictionary for JSON serialization""" + return { + 'user': metadata.user, + 'collection': metadata.collection, + 'name': metadata.name, + 'description': metadata.description, + 'tags': list(metadata.tags) + } + class CollectionManager: """Manages collection metadata via config service""" @@ -48,18 +58,22 @@ class CollectionManager: Send config request and wait for response Args: - request: Config service request + request: Config service request (without id field) Returns: ConfigResponse from config service """ - event = asyncio.Event() - self.pending_config_requests[request.id] = event + # Generate request ID - passed via message properties, not in schema + request_id = str(uuid.uuid4()) - await self.config_request_producer.send(request) + event = asyncio.Event() + self.pending_config_requests[request_id] = event + + # Send request with ID in message properties + await self.config_request_producer.send(request, properties={"id": request_id}) await event.wait() - response = self.pending_config_requests.pop(request.id + "_response") + response = self.pending_config_requests.pop(request_id + "_response") return response async def on_config_response(self, message, consumer, flow): @@ -71,10 +85,12 @@ class CollectionManager: consumer: Consumer instance flow: Flow context """ - response = message.value() - if response.id in self.pending_config_requests: - self.pending_config_requests[response.id + "_response"] = response - self.pending_config_requests[response.id].set() + # Get ID from message properties + response_id = message.properties().get("id") + if response_id and response_id in self.pending_config_requests: + response = message.value() + self.pending_config_requests[response_id + "_response"] = response + self.pending_config_requests[response_id].set() async def ensure_collection_exists(self, user: str, collection: str): """ @@ -87,10 +103,8 @@ class CollectionManager: try: # Check if collection exists via config service request = ConfigRequest( - id=str(uuid.uuid4()), operation='get', - type='collection', - keys=[f'{user}:{collection}'] + keys=[ConfigKey(type='collection', key=f'{user}:{collection}')] ) response = await self.send_config_request(request) @@ -112,11 +126,12 @@ class CollectionManager: ) request = ConfigRequest( - id=str(uuid.uuid4()), operation='put', - type='collection', - key=f'{user}:{collection}', - value=json.dumps(metadata.to_dict()) + values=[ConfigValue( + type='collection', + key=f'{user}:{collection}', + value=json.dumps(metadata_to_dict(metadata)) + )] ) response = await self.send_config_request(request) @@ -143,7 +158,6 @@ class CollectionManager: try: # Get all collections from config service config_request = ConfigRequest( - id=str(uuid.uuid4()), operation='getvalues', type='collection' ) @@ -155,11 +169,11 @@ class CollectionManager: # Parse collections and filter by user collections = [] - for key, value_json in response.values.items(): - if ":" in key: - coll_user, coll_name = key.split(":", 1) + for config_value in response.values: + if ":" in config_value.key: + coll_user, coll_name = config_value.key.split(":", 1) if coll_user == request.user: - metadata_dict = json.loads(value_json) + metadata_dict = json.loads(config_value.value) metadata = CollectionMetadata(**metadata_dict) collections.append(metadata) @@ -211,11 +225,12 @@ class CollectionManager: # Send put request to config service config_request = ConfigRequest( - id=str(uuid.uuid4()), operation='put', - type='collection', - key=f'{request.user}:{request.collection}', - value=json.dumps(metadata.to_dict()) + values=[ConfigValue( + type='collection', + key=f'{request.user}:{request.collection}', + value=json.dumps(metadata_to_dict(metadata)) + )] ) response = await self.send_config_request(config_request) @@ -253,10 +268,8 @@ class CollectionManager: # Send delete request to config service config_request = ConfigRequest( - id=str(uuid.uuid4()), operation='delete', - type='collection', - key=f'{request.user}:{request.collection}' + keys=[ConfigKey(type='collection', key=f'{request.user}:{request.collection}')] ) response = await self.send_config_request(config_request)