From 3e23d3c3ed7d13a773860a8d46be27d91b81ef23 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Tue, 30 Sep 2025 23:04:28 +0100 Subject: [PATCH] Fix collection management sync prob (#544) * Address creation/deletion sync problems * Fix object writer management * Get Milvus to use ANN --- .../direct/milvus_doc_embeddings.py | 11 ++--------- .../direct/milvus_graph_embeddings.py | 11 ++--------- .../librarian/collection_manager.py | 10 +++++----- .../storage/objects/cassandra/write.py | 19 ++++++++++--------- 4 files changed, 19 insertions(+), 32 deletions(-) diff --git a/trustgraph-flow/trustgraph/direct/milvus_doc_embeddings.py b/trustgraph-flow/trustgraph/direct/milvus_doc_embeddings.py index 131c114a..a96d06df 100644 --- a/trustgraph-flow/trustgraph/direct/milvus_doc_embeddings.py +++ b/trustgraph-flow/trustgraph/direct/milvus_doc_embeddings.py @@ -144,14 +144,6 @@ class DocVectors: coll = self.collections[(dim, user, collection)] - search_params = { - "metric_type": "COSINE", - "params": { - "radius": 0.1, - "range_filter": 0.8 - } - } - logger.debug("Loading...") self.client.load_collection( collection_name=coll, @@ -161,10 +153,11 @@ class DocVectors: res = self.client.search( collection_name=coll, + anns_field="vector", data=[embeds], limit=limit, output_fields=fields, - search_params=search_params, + search_params={ "metric_type": "COSINE" }, )[0] diff --git a/trustgraph-flow/trustgraph/direct/milvus_graph_embeddings.py b/trustgraph-flow/trustgraph/direct/milvus_graph_embeddings.py index 7c2cb55b..b3ed2a9f 100644 --- a/trustgraph-flow/trustgraph/direct/milvus_graph_embeddings.py +++ b/trustgraph-flow/trustgraph/direct/milvus_graph_embeddings.py @@ -144,14 +144,6 @@ class EntityVectors: coll = self.collections[(dim, user, collection)] - search_params = { - "metric_type": "COSINE", - "params": { - "radius": 0.1, - "range_filter": 0.8 - } - } - logger.debug("Loading...") self.client.load_collection( collection_name=coll, @@ -161,10 +153,11 @@ class EntityVectors: res = self.client.search( collection_name=coll, + anns_field="vector", data=[embeds], limit=limit, output_fields=fields, - search_params=search_params, + search_params={ "metric_type": "COSINE" }, )[0] diff --git a/trustgraph-flow/trustgraph/librarian/collection_manager.py b/trustgraph-flow/trustgraph/librarian/collection_manager.py index ea67fa3f..1530ed84 100644 --- a/trustgraph-flow/trustgraph/librarian/collection_manager.py +++ b/trustgraph-flow/trustgraph/librarian/collection_manager.py @@ -88,7 +88,7 @@ class CollectionManager: logger.info(f"Broadcasting create-collection for {creation_key}") self.pending_deletions[creation_key] = { - "responses_pending": 3, # vector, object, triples + "responses_pending": 4, # doc-embeddings, graph-embeddings, object, triples "responses_received": [], "all_successful": True, "error_messages": [], @@ -213,7 +213,7 @@ class CollectionManager: logger.info(f"Broadcasting create-collection for {creation_key}") self.pending_deletions[creation_key] = { - "responses_pending": 3, # vector, object, triples + "responses_pending": 4, # doc-embeddings, graph-embeddings, object, triples "responses_received": [], "all_successful": True, "error_messages": [], @@ -328,7 +328,7 @@ class CollectionManager: # Track this deletion request self.pending_deletions[deletion_key] = { - "responses_pending": 3, # vector, object, triples + "responses_pending": 4, # doc-embeddings, graph-embeddings, object, triples "responses_received": [], "all_successful": True, "error_messages": [], @@ -418,9 +418,9 @@ class CollectionManager: if response.error and response.error.message: info["all_successful"] = False info["error_messages"].append(response.error.message) - logger.warning(f"Storage deletion failed for {deletion_key}: {response.error.message}") + logger.warning(f"Storage operation failed for {deletion_key}: {response.error.message}") else: - logger.debug(f"Storage deletion succeeded for {deletion_key}") + logger.debug(f"Storage operation succeeded for {deletion_key}") # If all responses received, signal completion if info["responses_pending"] == 0: diff --git a/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py b/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py index 90578438..053dbcb2 100644 --- a/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py @@ -456,35 +456,36 @@ class Processor(FlowProcessor): async def on_storage_management(self, msg, consumer, flow): """Handle storage management requests for collection operations""" - logger.info(f"Received storage management request: {msg.operation} for {msg.user}/{msg.collection}") + request = msg.value() + logger.info(f"Received storage management request: {request.operation} for {request.user}/{request.collection}") try: - if msg.operation == "create-collection": - await self.create_collection(msg.user, msg.collection) + if request.operation == "create-collection": + await self.create_collection(request.user, request.collection) # Send success response response = StorageManagementResponse( error=None # No error means success ) await self.storage_response_producer.send(response) - logger.info(f"Successfully created collection {msg.user}/{msg.collection}") - elif msg.operation == "delete-collection": - await self.delete_collection(msg.user, msg.collection) + logger.info(f"Successfully created collection {request.user}/{request.collection}") + elif request.operation == "delete-collection": + await self.delete_collection(request.user, request.collection) # Send success response response = StorageManagementResponse( error=None # No error means success ) await self.storage_response_producer.send(response) - logger.info(f"Successfully deleted collection {msg.user}/{msg.collection}") + logger.info(f"Successfully deleted collection {request.user}/{request.collection}") else: - logger.warning(f"Unknown storage management operation: {msg.operation}") + logger.warning(f"Unknown storage management operation: {request.operation}") # Send error response from .... schema import Error response = StorageManagementResponse( error=Error( type="unknown_operation", - message=f"Unknown operation: {msg.operation}" + message=f"Unknown operation: {request.operation}" ) ) await self.storage_response_producer.send(response)