Fix/queue configurations (#585)

* Fix config-svc startup dupe CLI args

* Fix missing params on collection service

* Fix collection management handling
This commit is contained in:
cybermaggedon 2025-12-06 14:54:47 +00:00 committed by GitHub
parent ba95fa226b
commit 39f6a8b940
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 59 additions and 63 deletions

View file

@ -41,9 +41,7 @@ class Collection:
collection = v["collection"],
name = v["name"],
description = v["description"],
tags = v["tags"],
created_at = v["created_at"],
updated_at = v["updated_at"]
tags = v["tags"]
)
for v in collections
]
@ -76,9 +74,7 @@ class Collection:
collection = v["collection"],
name = v["name"],
description = v["description"],
tags = v["tags"],
created_at = v["created_at"],
updated_at = v["updated_at"]
tags = v["tags"]
)
return None
except Exception as e:

View file

@ -49,8 +49,6 @@ class CollectionMetadata:
name : str
description : str
tags : List[str]
created_at : str
updated_at : str
# Streaming chunk types

View file

@ -15,8 +15,6 @@ class CollectionManagementRequestTranslator(MessageTranslator):
name=data.get("name"),
description=data.get("description"),
tags=data.get("tags"),
created_at=data.get("created_at"),
updated_at=data.get("updated_at"),
tag_filter=data.get("tag_filter"),
limit=data.get("limit")
)
@ -38,10 +36,6 @@ class CollectionManagementRequestTranslator(MessageTranslator):
result["description"] = obj.description
if obj.tags is not None:
result["tags"] = list(obj.tags)
if obj.created_at is not None:
result["created_at"] = obj.created_at
if obj.updated_at is not None:
result["updated_at"] = obj.updated_at
if obj.tag_filter is not None:
result["tag_filter"] = list(obj.tag_filter)
if obj.limit is not None:
@ -73,9 +67,7 @@ class CollectionManagementResponseTranslator(MessageTranslator):
collection=coll_data.get("collection"),
name=coll_data.get("name"),
description=coll_data.get("description"),
tags=coll_data.get("tags"),
created_at=coll_data.get("created_at"),
updated_at=coll_data.get("updated_at")
tags=coll_data.get("tags", [])
))
return CollectionManagementResponse(
@ -104,9 +96,7 @@ class CollectionManagementResponseTranslator(MessageTranslator):
"collection": coll.collection,
"name": coll.name,
"description": coll.description,
"tags": list(coll.tags) if coll.tags else [],
"created_at": coll.created_at,
"updated_at": coll.updated_at
"tags": list(coll.tags) if coll.tags else []
})
print("RESULT IS", result, flush=True)

View file

@ -28,19 +28,17 @@ def list_collections(url, user, tag_filter):
collection.collection,
collection.name,
collection.description,
", ".join(collection.tags),
collection.created_at,
collection.updated_at
", ".join(collection.tags)
])
headers = ["Collection", "Name", "Description", "Tags", "Created", "Updated"]
headers = ["Collection", "Name", "Description", "Tags"]
print(tabulate.tabulate(
table,
headers=headers,
tablefmt="pretty",
stralign="left",
maxcolwidths=[20, 30, 50, 30, 19, 19],
maxcolwidths=[20, 30, 50, 30],
))
def main():

View file

@ -31,7 +31,6 @@ def set_collection(url, user, collection, name, description, tags, token=None):
table.append(("Name", result.name))
table.append(("Description", result.description))
table.append(("Tags", ", ".join(result.tags)))
table.append(("Updated", result.updated_at))
print(tabulate.tabulate(
table,

View file

@ -270,11 +270,7 @@ class Processor(AsyncProcessor):
help=f'Config response queue {default_config_response_queue}',
)
parser.add_argument(
'--config-push-queue',
default=default_config_push_queue,
help=f'Config push queue (default: {default_config_push_queue})'
)
# Note: --config-push-queue is already added by AsyncProcessor.add_args()
parser.add_argument(
'--flow-request-queue',

View file

@ -5,14 +5,20 @@ from ... messaging import TranslatorRegistry
from . requestor import ServiceRequestor
class CollectionManagementRequestor(ServiceRequestor):
def __init__(self, pulsar_client, consumer, subscriber, timeout=120):
def __init__(self, pulsar_client, consumer, subscriber, timeout=120,
request_queue=None, response_queue=None):
if request_queue is None:
request_queue = collection_request_queue
if response_queue is None:
response_queue = collection_response_queue
super(CollectionManagementRequestor, self).__init__(
pulsar_client=pulsar_client,
consumer_name = consumer,
subscription = subscriber,
request_queue=collection_request_queue,
response_queue=collection_response_queue,
request_queue=request_queue,
response_queue=response_queue,
request_schema=CollectionManagementRequest,
response_schema=CollectionManagementResponse,
timeout=timeout,

View file

@ -11,12 +11,22 @@ from typing import Dict, Any, List, Optional
from .. schema import CollectionManagementRequest, CollectionManagementResponse, Error
from .. schema import CollectionMetadata
from .. schema import ConfigRequest, ConfigResponse
from .. schema import ConfigRequest, ConfigResponse, ConfigKey, ConfigValue
from .. exceptions import RequestError
# Module logger
logger = logging.getLogger(__name__)
def metadata_to_dict(metadata: CollectionMetadata) -> dict:
"""Convert CollectionMetadata to dictionary for JSON serialization"""
return {
'user': metadata.user,
'collection': metadata.collection,
'name': metadata.name,
'description': metadata.description,
'tags': list(metadata.tags)
}
class CollectionManager:
"""Manages collection metadata via config service"""
@ -48,18 +58,22 @@ class CollectionManager:
Send config request and wait for response
Args:
request: Config service request
request: Config service request (without id field)
Returns:
ConfigResponse from config service
"""
event = asyncio.Event()
self.pending_config_requests[request.id] = event
# Generate request ID - passed via message properties, not in schema
request_id = str(uuid.uuid4())
await self.config_request_producer.send(request)
event = asyncio.Event()
self.pending_config_requests[request_id] = event
# Send request with ID in message properties
await self.config_request_producer.send(request, properties={"id": request_id})
await event.wait()
response = self.pending_config_requests.pop(request.id + "_response")
response = self.pending_config_requests.pop(request_id + "_response")
return response
async def on_config_response(self, message, consumer, flow):
@ -71,10 +85,12 @@ class CollectionManager:
consumer: Consumer instance
flow: Flow context
"""
response = message.value()
if response.id in self.pending_config_requests:
self.pending_config_requests[response.id + "_response"] = response
self.pending_config_requests[response.id].set()
# Get ID from message properties
response_id = message.properties().get("id")
if response_id and response_id in self.pending_config_requests:
response = message.value()
self.pending_config_requests[response_id + "_response"] = response
self.pending_config_requests[response_id].set()
async def ensure_collection_exists(self, user: str, collection: str):
"""
@ -87,10 +103,8 @@ class CollectionManager:
try:
# Check if collection exists via config service
request = ConfigRequest(
id=str(uuid.uuid4()),
operation='get',
type='collection',
keys=[f'{user}:{collection}']
keys=[ConfigKey(type='collection', key=f'{user}:{collection}')]
)
response = await self.send_config_request(request)
@ -112,11 +126,12 @@ class CollectionManager:
)
request = ConfigRequest(
id=str(uuid.uuid4()),
operation='put',
type='collection',
key=f'{user}:{collection}',
value=json.dumps(metadata.to_dict())
values=[ConfigValue(
type='collection',
key=f'{user}:{collection}',
value=json.dumps(metadata_to_dict(metadata))
)]
)
response = await self.send_config_request(request)
@ -143,7 +158,6 @@ class CollectionManager:
try:
# Get all collections from config service
config_request = ConfigRequest(
id=str(uuid.uuid4()),
operation='getvalues',
type='collection'
)
@ -155,11 +169,11 @@ class CollectionManager:
# Parse collections and filter by user
collections = []
for key, value_json in response.values.items():
if ":" in key:
coll_user, coll_name = key.split(":", 1)
for config_value in response.values:
if ":" in config_value.key:
coll_user, coll_name = config_value.key.split(":", 1)
if coll_user == request.user:
metadata_dict = json.loads(value_json)
metadata_dict = json.loads(config_value.value)
metadata = CollectionMetadata(**metadata_dict)
collections.append(metadata)
@ -211,11 +225,12 @@ class CollectionManager:
# Send put request to config service
config_request = ConfigRequest(
id=str(uuid.uuid4()),
operation='put',
type='collection',
key=f'{request.user}:{request.collection}',
value=json.dumps(metadata.to_dict())
values=[ConfigValue(
type='collection',
key=f'{request.user}:{request.collection}',
value=json.dumps(metadata_to_dict(metadata))
)]
)
response = await self.send_config_request(config_request)
@ -253,10 +268,8 @@ class CollectionManager:
# Send delete request to config service
config_request = ConfigRequest(
id=str(uuid.uuid4()),
operation='delete',
type='collection',
key=f'{request.user}:{request.collection}'
keys=[ConfigKey(type='collection', key=f'{request.user}:{request.collection}')]
)
response = await self.send_config_request(config_request)