Collection management (#520)

* Tech spec

* Refactored Cassanda knowledge graph for single table

* Collection management, librarian services to manage metadata and collection deletion
This commit is contained in:
cybermaggedon 2025-09-18 15:57:52 +01:00 committed by GitHub
parent 48016d8fb2
commit 13ff7d765d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
48 changed files with 2941 additions and 425 deletions

View file

@ -6,18 +6,18 @@ from ssl import SSLContext, PROTOCOL_TLSv1_2
# Global list to track clusters for cleanup
_active_clusters = []
class TrustGraph:
class KnowledgeGraph:
def __init__(
self, hosts=None,
keyspace="trustgraph", table="default", username=None, password=None
keyspace="trustgraph", username=None, password=None
):
if hosts is None:
hosts = ["localhost"]
self.keyspace = keyspace
self.table = table
self.table = "triples" # Fixed table name for unified schema
self.username = username
if username and password:
@ -55,13 +55,19 @@ class TrustGraph:
self.session.execute(f"""
create table if not exists {self.table} (
collection text,
s text,
p text,
o text,
PRIMARY KEY (s, p, o)
PRIMARY KEY (collection, s, p, o)
);
""");
self.session.execute(f"""
create index if not exists {self.table}_s
ON {self.table} (s);
""");
self.session.execute(f"""
create index if not exists {self.table}_p
ON {self.table} (p);
@ -72,58 +78,66 @@ class TrustGraph:
ON {self.table} (o);
""");
def insert(self, s, p, o):
def insert(self, collection, s, p, o):
self.session.execute(
f"insert into {self.table} (s, p, o) values (%s, %s, %s)",
(s, p, o)
f"insert into {self.table} (collection, s, p, o) values (%s, %s, %s, %s)",
(collection, s, p, o)
)
def get_all(self, limit=50):
def get_all(self, collection, limit=50):
return self.session.execute(
f"select s, p, o from {self.table} limit {limit}"
f"select s, p, o from {self.table} where collection = %s limit {limit}",
(collection,)
)
def get_s(self, s, limit=10):
def get_s(self, collection, s, limit=10):
return self.session.execute(
f"select p, o from {self.table} where s = %s limit {limit}",
(s,)
f"select p, o from {self.table} where collection = %s and s = %s limit {limit}",
(collection, s)
)
def get_p(self, p, limit=10):
def get_p(self, collection, p, limit=10):
return self.session.execute(
f"select s, o from {self.table} where p = %s limit {limit}",
(p,)
f"select s, o from {self.table} where collection = %s and p = %s limit {limit}",
(collection, p)
)
def get_o(self, o, limit=10):
def get_o(self, collection, o, limit=10):
return self.session.execute(
f"select s, p from {self.table} where o = %s limit {limit}",
(o,)
f"select s, p from {self.table} where collection = %s and o = %s limit {limit}",
(collection, o)
)
def get_sp(self, s, p, limit=10):
def get_sp(self, collection, s, p, limit=10):
return self.session.execute(
f"select o from {self.table} where s = %s and p = %s limit {limit}",
(s, p)
f"select o from {self.table} where collection = %s and s = %s and p = %s limit {limit}",
(collection, s, p)
)
def get_po(self, p, o, limit=10):
def get_po(self, collection, p, o, limit=10):
return self.session.execute(
f"select s from {self.table} where p = %s and o = %s limit {limit} allow filtering",
(p, o)
f"select s from {self.table} where collection = %s and p = %s and o = %s limit {limit} allow filtering",
(collection, p, o)
)
def get_os(self, o, s, limit=10):
def get_os(self, collection, o, s, limit=10):
return self.session.execute(
f"select p from {self.table} where o = %s and s = %s limit {limit}",
(o, s)
f"select p from {self.table} where collection = %s and o = %s and s = %s limit {limit} allow filtering",
(collection, o, s)
)
def get_spo(self, s, p, o, limit=10):
def get_spo(self, collection, s, p, o, limit=10):
return self.session.execute(
f"""select s as x from {self.table} where s = %s and p = %s and o = %s limit {limit}""",
(s, p, o)
f"""select s as x from {self.table} where collection = %s and s = %s and p = %s and o = %s limit {limit}""",
(collection, s, p, o)
)
def delete_collection(self, collection):
"""Delete all triples for a specific collection"""
self.session.execute(
f"delete from {self.table} where collection = %s",
(collection,)
)
def close(self):

View file

@ -6,7 +6,7 @@ import re
logger = logging.getLogger(__name__)
def make_safe_collection_name(user, collection, dimension, prefix):
def make_safe_collection_name(user, collection, prefix):
"""
Create a safe Milvus collection name from user/collection parameters.
Milvus only allows letters, numbers, and underscores.
@ -26,7 +26,7 @@ def make_safe_collection_name(user, collection, dimension, prefix):
safe_user = sanitize(user)
safe_collection = sanitize(collection)
return f"{prefix}_{safe_user}_{safe_collection}_{dimension}"
return f"{prefix}_{safe_user}_{safe_collection}"
class DocVectors:
@ -51,7 +51,7 @@ class DocVectors:
def init_collection(self, dimension, user, collection):
collection_name = make_safe_collection_name(user, collection, dimension, self.prefix)
collection_name = make_safe_collection_name(user, collection, self.prefix)
pkey_field = FieldSchema(
name="id",
@ -162,3 +162,20 @@ class DocVectors:
return res
def delete_collection(self, user, collection):
"""Delete a collection for the given user and collection"""
collection_name = make_safe_collection_name(user, collection, self.prefix)
# Check if collection exists
if self.client.has_collection(collection_name):
# Drop the collection
self.client.drop_collection(collection_name)
logger.info(f"Deleted Milvus collection: {collection_name}")
# Remove from our local cache
keys_to_remove = [key for key in self.collections.keys() if key[1] == user and key[2] == collection]
for key in keys_to_remove:
del self.collections[key]
else:
logger.info(f"Collection {collection_name} does not exist, nothing to delete")

View file

@ -6,7 +6,7 @@ import re
logger = logging.getLogger(__name__)
def make_safe_collection_name(user, collection, dimension, prefix):
def make_safe_collection_name(user, collection, prefix):
"""
Create a safe Milvus collection name from user/collection parameters.
Milvus only allows letters, numbers, and underscores.
@ -26,7 +26,7 @@ def make_safe_collection_name(user, collection, dimension, prefix):
safe_user = sanitize(user)
safe_collection = sanitize(collection)
return f"{prefix}_{safe_user}_{safe_collection}_{dimension}"
return f"{prefix}_{safe_user}_{safe_collection}"
class EntityVectors:
@ -51,7 +51,7 @@ class EntityVectors:
def init_collection(self, dimension, user, collection):
collection_name = make_safe_collection_name(user, collection, dimension, self.prefix)
collection_name = make_safe_collection_name(user, collection, self.prefix)
pkey_field = FieldSchema(
name="id",
@ -162,3 +162,20 @@ class EntityVectors:
return res
def delete_collection(self, user, collection):
"""Delete a collection for the given user and collection"""
collection_name = make_safe_collection_name(user, collection, self.prefix)
# Check if collection exists
if self.client.has_collection(collection_name):
# Drop the collection
self.client.drop_collection(collection_name)
logger.info(f"Deleted Milvus collection: {collection_name}")
# Remove from our local cache
keys_to_remove = [key for key in self.collections.keys() if key[1] == user and key[2] == collection]
for key in keys_to_remove:
del self.collections[key]
else:
logger.info(f"Collection {collection_name} does not exist, nothing to delete")

View file

@ -0,0 +1,28 @@
from ... schema import CollectionManagementRequest, CollectionManagementResponse
from ... schema import collection_request_queue, collection_response_queue
from ... messaging import TranslatorRegistry
from . requestor import ServiceRequestor
class CollectionManagementRequestor(ServiceRequestor):
def __init__(self, pulsar_client, consumer, subscriber, timeout=120):
super(CollectionManagementRequestor, self).__init__(
pulsar_client=pulsar_client,
consumer_name = consumer,
subscription = subscriber,
request_queue=collection_request_queue,
response_queue=collection_response_queue,
request_schema=CollectionManagementRequest,
response_schema=CollectionManagementResponse,
timeout=timeout,
)
self.request_translator = TranslatorRegistry.get_request_translator("collection-management")
self.response_translator = TranslatorRegistry.get_response_translator("collection-management")
def to_request(self, body):
return self.request_translator.to_pulsar(body)
def from_response(self, message):
return self.response_translator.from_response_with_completion(message)

View file

@ -11,6 +11,7 @@ from . config import ConfigRequestor
from . flow import FlowRequestor
from . librarian import LibrarianRequestor
from . knowledge import KnowledgeRequestor
from . collection_management import CollectionManagementRequestor
from . embeddings import EmbeddingsRequestor
from . agent import AgentRequestor
@ -66,6 +67,7 @@ global_dispatchers = {
"flow": FlowRequestor,
"librarian": LibrarianRequestor,
"knowledge": KnowledgeRequestor,
"collection-management": CollectionManagementRequestor,
}
sender_dispatchers = {

View file

@ -0,0 +1,362 @@
"""
Collection management service for the librarian
"""
import asyncio
import logging
from datetime import datetime
from .. base import AsyncProcessor, Consumer, Producer
from .. base import ConsumerMetrics, ProducerMetrics
from .. base.cassandra_config import add_cassandra_args, resolve_cassandra_config
from .. schema import CollectionManagementRequest, CollectionManagementResponse, Error
from .. schema import collection_request_queue, collection_response_queue
from .. schema import CollectionMetadata
from .. schema import StorageManagementRequest, StorageManagementResponse
from .. schema import vector_storage_management_topic, object_storage_management_topic, triples_storage_management_topic, storage_management_response_topic
from .. exceptions import RequestError
from .. tables.library import LibraryTableStore
# Module logger
logger = logging.getLogger(__name__)
default_ident = "collection-management"
default_cassandra_host = "cassandra"
keyspace = "librarian"
class Processor(AsyncProcessor):
def __init__(self, **params):
id = params.get("id", default_ident)
# Get Cassandra configuration
cassandra_host = params.get("cassandra_host", default_cassandra_host)
cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password = resolve_cassandra_config(
host=cassandra_host,
username=cassandra_username,
password=cassandra_password
)
super(Processor, self).__init__(
**params | {
"cassandra_host": ','.join(hosts),
"cassandra_username": username
}
)
self.cassandra_host = hosts
self.cassandra_username = username
self.cassandra_password = password
# Set up metrics
collection_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="collection-request"
)
collection_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="collection-response"
)
# Set up consumer for collection management requests
self.collection_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=collection_request_queue,
subscriber=id,
schema=CollectionManagementRequest,
handler=self.on_collection_request,
metrics=collection_request_metrics,
)
# Set up producer for collection management responses
self.collection_response_producer = Producer(
client=self.pulsar_client,
topic=collection_response_queue,
schema=CollectionManagementResponse,
metrics=collection_response_metrics,
)
# Set up producers for storage management requests
self.vector_storage_producer = Producer(
client=self.pulsar_client,
topic=vector_storage_management_topic,
schema=StorageManagementRequest,
)
self.object_storage_producer = Producer(
client=self.pulsar_client,
topic=object_storage_management_topic,
schema=StorageManagementRequest,
)
self.triples_storage_producer = Producer(
client=self.pulsar_client,
topic=triples_storage_management_topic,
schema=StorageManagementRequest,
)
# Set up consumer for storage management responses
storage_response_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-response"
)
self.storage_response_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=storage_management_response_topic,
subscriber=f"{id}-storage",
schema=StorageManagementResponse,
handler=self.on_storage_response,
metrics=storage_response_metrics,
)
# Initialize table store
self.table_store = LibraryTableStore(
cassandra_host=self.cassandra_host,
cassandra_username=self.cassandra_username,
cassandra_password=self.cassandra_password,
keyspace=keyspace
)
# Track pending deletion requests by user+collection
self.pending_deletions = {} # (user, collection) -> {responses_pending, responses_received, all_successful, error_messages, deletion_complete}
async def on_collection_request(self, message):
"""Handle collection management requests"""
logger.debug(f"Collection request: {message.operation}")
try:
if message.operation == "list-collections":
response = await self.handle_list_collections(message)
elif message.operation == "update-collection":
response = await self.handle_update_collection(message)
elif message.operation == "delete-collection":
response = await self.handle_delete_collection(message)
else:
response = CollectionManagementResponse(
success="false",
error=Error(
type="invalid_operation",
message=f"Unknown operation: {message.operation}"
),
timestamp=datetime.now().isoformat()
)
except Exception as e:
logger.error(f"Error processing collection request: {e}", exc_info=True)
response = CollectionManagementResponse(
success="false",
error=Error(
type="processing_error",
message=str(e)
),
timestamp=datetime.now().isoformat()
)
await self.collection_response_producer.send(response)
async def on_storage_response(self, response):
"""Handle storage management responses"""
logger.debug(f"Received storage response: error={response.error}")
# Find matching deletion by checking all pending deletions
# Note: This is simplified correlation - assumes responses come back quickly
# In production, we'd want better correlation mechanism
for deletion_key, info in list(self.pending_deletions.items()):
if info["responses_pending"] > 0:
# Record this response
info["responses_received"].append(response)
info["responses_pending"] -= 1
# Check if this response indicates failure
if response.error and response.error.message:
info["all_successful"] = False
info["error_messages"].append(response.error.message)
logger.warning(f"Storage deletion failed for {deletion_key}: {response.error.message}")
else:
logger.debug(f"Storage deletion succeeded for {deletion_key}")
# If all responses received, signal completion
if info["responses_pending"] == 0:
logger.info(f"All storage responses received for {deletion_key}")
info["deletion_complete"].set()
break # Only process for first matching deletion
# For now, we'll correlate by user+collection since we don't have deletion_id in the response
# This is a simplified approach - in production we'd want better correlation
for deletion_id, info in list(self.pending_deletions.items()):
if info["responses_pending"] > 0:
# Record this response
info["responses_received"].append(response)
info["responses_pending"] -= 1
# Check if this response indicates failure
if response.error and response.error.message:
info["all_successful"] = False
info["error_messages"].append(response.error.message)
logger.warning(f"Storage deletion failed for {deletion_id}: {response.error.message}")
# If all responses received, signal completion
if info["responses_pending"] == 0:
logger.info(f"All storage responses received for {deletion_id}")
info["deletion_complete"].set()
break # Only process for first matching deletion
async def handle_list_collections(self, message):
"""Handle list collections request"""
try:
tag_filter = list(message.tag_filter) if message.tag_filter else None
collections = await self.table_store.list_collections(message.user, tag_filter)
collection_metadata = [
CollectionMetadata(
user=coll["user"],
collection=coll["collection"],
name=coll["name"],
description=coll["description"],
tags=coll["tags"],
created_at=coll["created_at"],
updated_at=coll["updated_at"]
)
for coll in collections
]
return CollectionManagementResponse(
success="true",
collections=collection_metadata,
timestamp=datetime.now().isoformat()
)
except Exception as e:
logger.error(f"Error listing collections: {e}")
raise
async def handle_update_collection(self, message):
"""Handle update collection request"""
try:
# Extract fields for update
name = message.name if message.name else None
description = message.description if message.description else None
tags = list(message.tags) if message.tags else None
updated_collection = await self.table_store.update_collection(
message.user, message.collection, name, description, tags
)
collection_metadata = CollectionMetadata(
user=updated_collection["user"],
collection=updated_collection["collection"],
name=updated_collection["name"],
description=updated_collection["description"],
tags=updated_collection["tags"],
created_at="", # Not returned by update
updated_at=updated_collection["updated_at"]
)
return CollectionManagementResponse(
success="true",
collections=[collection_metadata],
timestamp=datetime.now().isoformat()
)
except Exception as e:
logger.error(f"Error updating collection: {e}")
raise
async def handle_delete_collection(self, message):
"""Handle delete collection request with cascade to all storage types"""
try:
deletion_key = (message.user, message.collection)
logger.info(f"Starting cascade deletion for {message.user}/{message.collection}")
# Track this deletion request
self.pending_deletions[deletion_key] = {
"responses_pending": 3, # vector, object, triples
"responses_received": [],
"all_successful": True,
"error_messages": [],
"deletion_complete": asyncio.Event()
}
# Create storage management request
storage_request = StorageManagementRequest(
operation="delete-collection",
user=message.user,
collection=message.collection
)
# Send delete requests to all three storage types
await self.vector_storage_producer.send(storage_request)
await self.object_storage_producer.send(storage_request)
await self.triples_storage_producer.send(storage_request)
logger.info(f"Storage deletion requests sent for {message.user}/{message.collection}")
# Wait for all storage responses (with timeout)
try:
await asyncio.wait_for(
self.pending_deletions[deletion_key]["deletion_complete"].wait(),
timeout=30.0 # 30 second timeout
)
except asyncio.TimeoutError:
logger.error(f"Timeout waiting for storage responses for {deletion_key}")
self.pending_deletions[deletion_key]["all_successful"] = False
self.pending_deletions[deletion_key]["error_messages"].append("Timeout waiting for storage responses")
# Check if all storage deletions were successful
deletion_info = self.pending_deletions.pop(deletion_key, {})
if deletion_info.get("all_successful", False):
# All storage deletions succeeded, now delete metadata
await self.table_store.delete_collection_metadata(message.user, message.collection)
logger.info(f"Successfully completed cascade deletion for {message.user}/{message.collection}")
return CollectionManagementResponse(
success="true",
timestamp=datetime.now().isoformat()
)
else:
# Some storage deletions failed
error_messages = deletion_info.get("error_messages", ["Unknown storage deletion error"])
error_msg = "; ".join(error_messages)
logger.error(f"Cascade deletion failed for {deletion_key}: {error_msg}")
return CollectionManagementResponse(
success="false",
error=Error(
type="storage_deletion_error",
message=f"Storage deletion failed: {error_msg}"
),
timestamp=datetime.now().isoformat()
)
except Exception as e:
logger.error(f"Error in cascade deletion: {e}")
return CollectionManagementResponse(
success="false",
error=Error(
type="deletion_error",
message=f"Failed to delete collection: {str(e)}"
),
timestamp=datetime.now().isoformat()
)
@staticmethod
def add_args(parser):
AsyncProcessor.add_args(parser)
add_cassandra_args(parser)
def run():
Processor.launch(default_ident, __doc__)

View file

@ -95,7 +95,7 @@ class Processor(DocumentEmbeddingsQueryService):
dim = len(vec)
index_name = (
"d-" + msg.user + "-" + msg.collection + "-" + str(dim)
"d-" + msg.user + "-" + msg.collection
)
self.ensure_index_exists(index_name, dim)

View file

@ -104,7 +104,7 @@ class Processor(GraphEmbeddingsQueryService):
dim = len(vec)
index_name = (
"t-" + msg.user + "-" + msg.collection + "-" + str(dim)
"t-" + msg.user + "-" + msg.collection
)
self.ensure_index_exists(index_name, dim)

View file

@ -6,7 +6,7 @@ null. Output is a list of triples.
import logging
from .... direct.cassandra import TrustGraph
from .... direct.cassandra_kg import KnowledgeGraph
from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error
from .... schema import Value, Triple
from .... base import TriplesQueryService
@ -56,21 +56,21 @@ class Processor(TriplesQueryService):
try:
table = (query.user, query.collection)
user = query.user
if table != self.table:
if user != self.table:
if self.cassandra_username and self.cassandra_password:
self.tg = TrustGraph(
self.tg = KnowledgeGraph(
hosts=self.cassandra_host,
keyspace=query.user, table=query.collection,
keyspace=query.user,
username=self.cassandra_username, password=self.cassandra_password
)
else:
self.tg = TrustGraph(
self.tg = KnowledgeGraph(
hosts=self.cassandra_host,
keyspace=query.user, table=query.collection,
keyspace=query.user,
)
self.table = table
self.table = user
triples = []
@ -78,13 +78,13 @@ class Processor(TriplesQueryService):
if query.p is not None:
if query.o is not None:
resp = self.tg.get_spo(
query.s.value, query.p.value, query.o.value,
query.collection, query.s.value, query.p.value, query.o.value,
limit=query.limit
)
triples.append((query.s.value, query.p.value, query.o.value))
else:
resp = self.tg.get_sp(
query.s.value, query.p.value,
query.collection, query.s.value, query.p.value,
limit=query.limit
)
for t in resp:
@ -92,14 +92,14 @@ class Processor(TriplesQueryService):
else:
if query.o is not None:
resp = self.tg.get_os(
query.o.value, query.s.value,
query.collection, query.o.value, query.s.value,
limit=query.limit
)
for t in resp:
triples.append((query.s.value, t.p, query.o.value))
else:
resp = self.tg.get_s(
query.s.value,
query.collection, query.s.value,
limit=query.limit
)
for t in resp:
@ -108,14 +108,14 @@ class Processor(TriplesQueryService):
if query.p is not None:
if query.o is not None:
resp = self.tg.get_po(
query.p.value, query.o.value,
query.collection, query.p.value, query.o.value,
limit=query.limit
)
for t in resp:
triples.append((t.s, query.p.value, query.o.value))
else:
resp = self.tg.get_p(
query.p.value,
query.collection, query.p.value,
limit=query.limit
)
for t in resp:
@ -123,13 +123,14 @@ class Processor(TriplesQueryService):
else:
if query.o is not None:
resp = self.tg.get_o(
query.o.value,
query.collection, query.o.value,
limit=query.limit
)
for t in resp:
triples.append((t.s, t.p, query.o.value))
else:
resp = self.tg.get_all(
query.collection,
limit=query.limit
)
for t in resp:

View file

@ -3,8 +3,17 @@
Accepts entity/vector pairs and writes them to a Milvus store.
"""
import logging
from .... direct.milvus_doc_embeddings import DocVectors
from .... base import DocumentEmbeddingsStoreService
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import vector_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
default_ident = "de-write"
default_store_uri = 'http://localhost:19530'
@ -23,6 +32,34 @@ class Processor(DocumentEmbeddingsStoreService):
self.vecstore = DocVectors(store_uri)
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=vector_storage_management_topic,
subscriber=f"{self.id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
async def store_document_embeddings(self, message):
for emb in message.chunks:
@ -50,6 +87,48 @@ class Processor(DocumentEmbeddingsStoreService):
help=f'Milvus store URI (default: {default_store_uri})'
)
async def on_storage_management(self, message):
"""Handle storage management requests"""
logger.info(f"Storage management request: {message.operation} for {message.user}/{message.collection}")
try:
if message.operation == "delete-collection":
await self.handle_delete_collection(message)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {message.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_delete_collection(self, message):
"""Delete the collection for document embeddings"""
try:
self.vecstore.delete_collection(message.user, message.collection)
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {message.user}/{message.collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
raise
def run():
Processor.launch(default_ident, __doc__)

View file

@ -12,6 +12,10 @@ import os
import logging
from .... base import DocumentEmbeddingsStoreService
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import vector_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -55,6 +59,34 @@ class Processor(DocumentEmbeddingsStoreService):
self.last_index_name = None
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=vector_storage_management_topic,
subscriber=f"{self.id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
def create_index(self, index_name, dim):
self.pinecone.create_index(
@ -96,7 +128,7 @@ class Processor(DocumentEmbeddingsStoreService):
dim = len(vec)
index_name = (
"d-" + message.metadata.user + "-" + message.metadata.collection + "-" + str(dim)
"d-" + message.metadata.user + "-" + message.metadata.collection
)
if index_name != self.last_index_name:
@ -160,6 +192,54 @@ class Processor(DocumentEmbeddingsStoreService):
help=f'Pinecone region, (default: {default_region}'
)
async def on_storage_management(self, message):
"""Handle storage management requests"""
logger.info(f"Storage management request: {message.operation} for {message.user}/{message.collection}")
try:
if message.operation == "delete-collection":
await self.handle_delete_collection(message)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {message.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_delete_collection(self, message):
"""Delete the collection for document embeddings"""
try:
index_name = f"d-{message.user}-{message.collection}"
if self.pinecone.has_index(index_name):
self.pinecone.delete_index(index_name)
logger.info(f"Deleted Pinecone index: {index_name}")
else:
logger.info(f"Index {index_name} does not exist, nothing to delete")
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {message.user}/{message.collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
raise
def run():
Processor.launch(default_ident, __doc__)

View file

@ -10,6 +10,10 @@ import uuid
import logging
from .... base import DocumentEmbeddingsStoreService
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import vector_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -36,6 +40,37 @@ class Processor(DocumentEmbeddingsStoreService):
self.qdrant = QdrantClient(url=store_uri, api_key=api_key)
# Set up storage management if base class attributes are available
# (they may not be in unit tests)
if hasattr(self, 'id') and hasattr(self, 'taskgroup') and hasattr(self, 'pulsar_client'):
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=vector_storage_management_topic,
subscriber=f"{self.id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
async def store_document_embeddings(self, message):
for emb in message.chunks:
@ -48,8 +83,7 @@ class Processor(DocumentEmbeddingsStoreService):
dim = len(vec)
collection = (
"d_" + message.metadata.user + "_" +
message.metadata.collection + "_" +
str(dim)
message.metadata.collection
)
if collection != self.last_collection:
@ -99,6 +133,54 @@ class Processor(DocumentEmbeddingsStoreService):
help=f'Qdrant API key (default: None)'
)
async def on_storage_management(self, message):
"""Handle storage management requests"""
logger.info(f"Storage management request: {message.operation} for {message.user}/{message.collection}")
try:
if message.operation == "delete-collection":
await self.handle_delete_collection(message)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {message.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_delete_collection(self, message):
"""Delete the collection for document embeddings"""
try:
collection_name = f"d_{message.user}_{message.collection}"
if self.qdrant.collection_exists(collection_name):
self.qdrant.delete_collection(collection_name)
logger.info(f"Deleted Qdrant collection: {collection_name}")
else:
logger.info(f"Collection {collection_name} does not exist, nothing to delete")
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {message.user}/{message.collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
raise
def run():
Processor.launch(default_ident, __doc__)

View file

@ -3,8 +3,17 @@
Accepts entity/vector pairs and writes them to a Milvus store.
"""
import logging
from .... direct.milvus_graph_embeddings import EntityVectors
from .... base import GraphEmbeddingsStoreService
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import vector_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
default_ident = "ge-write"
default_store_uri = 'http://localhost:19530'
@ -23,6 +32,34 @@ class Processor(GraphEmbeddingsStoreService):
self.vecstore = EntityVectors(store_uri)
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=vector_storage_management_topic,
subscriber=f"{self.id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
async def store_graph_embeddings(self, message):
for entity in message.entities:
@ -46,6 +83,48 @@ class Processor(GraphEmbeddingsStoreService):
help=f'Milvus store URI (default: {default_store_uri})'
)
async def on_storage_management(self, message):
"""Handle storage management requests"""
logger.info(f"Storage management request: {message.operation} for {message.user}/{message.collection}")
try:
if message.operation == "delete-collection":
await self.handle_delete_collection(message)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {message.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_delete_collection(self, message):
"""Delete the collection for graph embeddings"""
try:
self.vecstore.delete_collection(message.user, message.collection)
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {message.user}/{message.collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
raise
def run():
Processor.launch(default_ident, __doc__)

View file

@ -12,6 +12,10 @@ import os
import logging
from .... base import GraphEmbeddingsStoreService
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import vector_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -55,6 +59,34 @@ class Processor(GraphEmbeddingsStoreService):
self.last_index_name = None
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=vector_storage_management_topic,
subscriber=f"{self.id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
def create_index(self, index_name, dim):
self.pinecone.create_index(
@ -95,7 +127,7 @@ class Processor(GraphEmbeddingsStoreService):
dim = len(vec)
index_name = (
"t-" + message.metadata.user + "-" + message.metadata.collection + "-" + str(dim)
"t-" + message.metadata.user + "-" + message.metadata.collection
)
if index_name != self.last_index_name:
@ -159,6 +191,54 @@ class Processor(GraphEmbeddingsStoreService):
help=f'Pinecone region, (default: {default_region}'
)
async def on_storage_management(self, message):
"""Handle storage management requests"""
logger.info(f"Storage management request: {message.operation} for {message.user}/{message.collection}")
try:
if message.operation == "delete-collection":
await self.handle_delete_collection(message)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {message.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_delete_collection(self, message):
"""Delete the collection for graph embeddings"""
try:
index_name = f"t-{message.user}-{message.collection}"
if self.pinecone.has_index(index_name):
self.pinecone.delete_index(index_name)
logger.info(f"Deleted Pinecone index: {index_name}")
else:
logger.info(f"Index {index_name} does not exist, nothing to delete")
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {message.user}/{message.collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
raise
def run():
Processor.launch(default_ident, __doc__)

View file

@ -10,6 +10,10 @@ import uuid
import logging
from .... base import GraphEmbeddingsStoreService
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import vector_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -36,10 +40,41 @@ class Processor(GraphEmbeddingsStoreService):
self.qdrant = QdrantClient(url=store_uri, api_key=api_key)
# Set up storage management if base class attributes are available
# (they may not be in unit tests)
if hasattr(self, 'id') and hasattr(self, 'taskgroup') and hasattr(self, 'pulsar_client'):
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=vector_storage_management_topic,
subscriber=f"{self.id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
def get_collection(self, dim, user, collection):
cname = (
"t_" + user + "_" + collection + "_" + str(dim)
"t_" + user + "_" + collection
)
if cname != self.last_collection:
@ -105,6 +140,54 @@ class Processor(GraphEmbeddingsStoreService):
help=f'Qdrant API key'
)
async def on_storage_management(self, message):
"""Handle storage management requests"""
logger.info(f"Storage management request: {message.operation} for {message.user}/{message.collection}")
try:
if message.operation == "delete-collection":
await self.handle_delete_collection(message)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {message.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_delete_collection(self, message):
"""Delete the collection for graph embeddings"""
try:
collection_name = f"t_{message.user}_{message.collection}"
if self.qdrant.collection_exists(collection_name):
self.qdrant.delete_collection(collection_name)
logger.info(f"Deleted Qdrant collection: {collection_name}")
else:
logger.info(f"Collection {collection_name} does not exist, nothing to delete")
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {message.user}/{message.collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
raise
def run():
Processor.launch(default_ident, __doc__)

View file

@ -13,7 +13,9 @@ from cassandra import ConsistencyLevel
from .... schema import ExtractedObject
from .... schema import RowSchema, Field
from .... base import FlowProcessor, ConsumerSpec
from .... schema import StorageManagementRequest, StorageManagementResponse
from .... schema import object_storage_management_topic, storage_management_response_topic
from .... base import FlowProcessor, ConsumerSpec, ProducerSpec
from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config
# Module logger
@ -61,7 +63,38 @@ class Processor(FlowProcessor):
handler = self.on_object
)
)
# Set up storage management consumer and producer directly
# (FlowProcessor doesn't support topic-based specs outside of flows)
from .... base import Consumer, Producer, ConsumerMetrics, ProducerMetrics
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Create storage management consumer
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=object_storage_management_topic,
subscriber=f"{id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Create storage management response producer
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
# Register config handler for schema updates
self.register_config_handler(self.on_schema_config)
@ -390,6 +423,100 @@ class Processor(FlowProcessor):
logger.error(f"Failed to insert object {obj_index}: {e}", exc_info=True)
raise
async def on_storage_management(self, msg, consumer, flow):
"""Handle storage management requests for collection operations"""
logger.info(f"Received storage management request: {msg.operation} for {msg.user}/{msg.collection}")
try:
if msg.operation == "delete-collection":
await self.delete_collection(msg.user, msg.collection)
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {msg.user}/{msg.collection}")
else:
logger.warning(f"Unknown storage management operation: {msg.operation}")
# Send error response
from .... schema import Error
response = StorageManagementResponse(
error=Error(
type="unknown_operation",
message=f"Unknown operation: {msg.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error handling storage management request: {e}", exc_info=True)
# Send error response
from .... schema import Error
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.send("storage-response", response)
async def delete_collection(self, user: str, collection: str):
"""Delete all data for a specific collection"""
# Connect if not already connected
self.connect_cassandra()
# Sanitize names for safety
safe_keyspace = self.sanitize_name(user)
# Check if keyspace exists
if safe_keyspace not in self.known_keyspaces:
# Query to verify keyspace exists
check_keyspace_cql = """
SELECT keyspace_name FROM system_schema.keyspaces
WHERE keyspace_name = %s
"""
result = self.session.execute(check_keyspace_cql, (safe_keyspace,))
if not result.one():
logger.info(f"Keyspace {safe_keyspace} does not exist, nothing to delete")
return
self.known_keyspaces.add(safe_keyspace)
# Get all tables in the keyspace that might contain collection data
get_tables_cql = """
SELECT table_name FROM system_schema.tables
WHERE keyspace_name = %s
"""
tables = self.session.execute(get_tables_cql, (safe_keyspace,))
tables_deleted = 0
for row in tables:
table_name = row.table_name
# Check if the table has a collection column
check_column_cql = """
SELECT column_name FROM system_schema.columns
WHERE keyspace_name = %s AND table_name = %s AND column_name = 'collection'
"""
result = self.session.execute(check_column_cql, (safe_keyspace, table_name))
if result.one():
# Table has collection column, delete data for this collection
try:
delete_cql = f"""
DELETE FROM {safe_keyspace}.{table_name}
WHERE collection = %s
"""
self.session.execute(delete_cql, (collection,))
tables_deleted += 1
logger.info(f"Deleted collection {collection} from table {safe_keyspace}.{table_name}")
except Exception as e:
logger.error(f"Failed to delete from table {safe_keyspace}.{table_name}: {e}")
raise
logger.info(f"Deleted collection {collection} from {tables_deleted} tables in keyspace {safe_keyspace}")
def close(self):
"""Clean up Cassandra connections"""
if self.cluster:

View file

@ -10,9 +10,13 @@ import argparse
import time
import logging
from .... direct.cassandra import TrustGraph
from .... direct.cassandra_kg import KnowledgeGraph
from .... base import TriplesStoreService
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import triples_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -50,42 +54,146 @@ class Processor(TriplesStoreService):
self.cassandra_password = password
self.table = None
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=triples_storage_management_topic,
subscriber=f"{id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
async def store_triples(self, message):
table = (message.metadata.user, message.metadata.collection)
user = message.metadata.user
if self.table is None or self.table != table:
if self.table is None or self.table != user:
self.tg = None
try:
if self.cassandra_username and self.cassandra_password:
self.tg = TrustGraph(
self.tg = KnowledgeGraph(
hosts=self.cassandra_host,
keyspace=message.metadata.user,
table=message.metadata.collection,
username=self.cassandra_username, password=self.cassandra_password
)
else:
self.tg = TrustGraph(
self.tg = KnowledgeGraph(
hosts=self.cassandra_host,
keyspace=message.metadata.user,
table=message.metadata.collection,
)
except Exception as e:
logger.error(f"Exception: {e}", exc_info=True)
time.sleep(1)
raise e
self.table = table
self.table = user
for t in message.triples:
self.tg.insert(
message.metadata.collection,
t.s.value,
t.p.value,
t.o.value
)
async def on_storage_management(self, message):
"""Handle storage management requests"""
logger.info(f"Storage management request: {message.operation} for {message.user}/{message.collection}")
try:
if message.operation == "delete-collection":
await self.handle_delete_collection(message)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {message.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_delete_collection(self, message):
"""Delete all data for a specific collection from the unified triples table"""
try:
# Create or reuse connection for this user's keyspace
if self.table is None or self.table != message.user:
self.tg = None
try:
if self.cassandra_username and self.cassandra_password:
self.tg = KnowledgeGraph(
hosts=self.cassandra_host,
keyspace=message.user,
username=self.cassandra_username,
password=self.cassandra_password
)
else:
self.tg = KnowledgeGraph(
hosts=self.cassandra_host,
keyspace=message.user,
)
except Exception as e:
logger.error(f"Failed to connect to Cassandra for user {message.user}: {e}")
raise
self.table = message.user
# Delete all triples for this collection from the unified table
# In the unified table schema, collection is the partition key
delete_cql = """
DELETE FROM triples
WHERE collection = ?
"""
try:
self.tg.session.execute(delete_cql, (message.collection,))
logger.info(f"Deleted all triples for collection {message.collection} from keyspace {message.user}")
except Exception as e:
logger.error(f"Failed to delete collection data: {e}")
raise
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {message.user}/{message.collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
raise
@staticmethod
def add_args(parser):

View file

@ -13,6 +13,10 @@ import logging
from falkordb import FalkorDB
from .... base import TriplesStoreService
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import triples_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -40,14 +44,44 @@ class Processor(TriplesStoreService):
self.io = FalkorDB.from_url(graph_url).select_graph(database)
def create_node(self, uri):
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
logger.debug(f"Create node {uri}")
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=triples_storage_management_topic,
subscriber=f"{self.id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
def create_node(self, uri, user, collection):
logger.debug(f"Create node {uri} for user={user}, collection={collection}")
res = self.io.query(
"MERGE (n:Node {uri: $uri})",
"MERGE (n:Node {uri: $uri, user: $user, collection: $collection})",
params={
"uri": uri,
"user": user,
"collection": collection,
},
)
@ -56,14 +90,16 @@ class Processor(TriplesStoreService):
time=res.run_time_ms
))
def create_literal(self, value):
def create_literal(self, value, user, collection):
logger.debug(f"Create literal {value}")
logger.debug(f"Create literal {value} for user={user}, collection={collection}")
res = self.io.query(
"MERGE (n:Literal {value: $value})",
"MERGE (n:Literal {value: $value, user: $user, collection: $collection})",
params={
"value": value,
"user": user,
"collection": collection,
},
)
@ -72,18 +108,20 @@ class Processor(TriplesStoreService):
time=res.run_time_ms
))
def relate_node(self, src, uri, dest):
def relate_node(self, src, uri, dest, user, collection):
logger.debug(f"Create node rel {src} {uri} {dest}")
logger.debug(f"Create node rel {src} {uri} {dest} for user={user}, collection={collection}")
res = self.io.query(
"MATCH (src:Node {uri: $src}) "
"MATCH (dest:Node {uri: $dest}) "
"MERGE (src)-[:Rel {uri: $uri}]->(dest)",
"MATCH (src:Node {uri: $src, user: $user, collection: $collection}) "
"MATCH (dest:Node {uri: $dest, user: $user, collection: $collection}) "
"MERGE (src)-[:Rel {uri: $uri, user: $user, collection: $collection}]->(dest)",
params={
"src": src,
"dest": dest,
"uri": uri,
"user": user,
"collection": collection,
},
)
@ -92,18 +130,20 @@ class Processor(TriplesStoreService):
time=res.run_time_ms
))
def relate_literal(self, src, uri, dest):
def relate_literal(self, src, uri, dest, user, collection):
logger.debug(f"Create literal rel {src} {uri} {dest}")
logger.debug(f"Create literal rel {src} {uri} {dest} for user={user}, collection={collection}")
res = self.io.query(
"MATCH (src:Node {uri: $src}) "
"MATCH (dest:Literal {value: $dest}) "
"MERGE (src)-[:Rel {uri: $uri}]->(dest)",
"MATCH (src:Node {uri: $src, user: $user, collection: $collection}) "
"MATCH (dest:Literal {value: $dest, user: $user, collection: $collection}) "
"MERGE (src)-[:Rel {uri: $uri, user: $user, collection: $collection}]->(dest)",
params={
"src": src,
"dest": dest,
"uri": uri,
"user": user,
"collection": collection,
},
)
@ -113,17 +153,20 @@ class Processor(TriplesStoreService):
))
async def store_triples(self, message):
# Extract user and collection from metadata
user = message.metadata.user if message.metadata.user else "default"
collection = message.metadata.collection if message.metadata.collection else "default"
for t in message.triples:
self.create_node(t.s.value)
self.create_node(t.s.value, user, collection)
if t.o.is_uri:
self.create_node(t.o.value)
self.relate_node(t.s.value, t.p.value, t.o.value)
self.create_node(t.o.value, user, collection)
self.relate_node(t.s.value, t.p.value, t.o.value, user, collection)
else:
self.create_literal(t.o.value)
self.relate_literal(t.s.value, t.p.value, t.o.value)
self.create_literal(t.o.value, user, collection)
self.relate_literal(t.s.value, t.p.value, t.o.value, user, collection)
@staticmethod
def add_args(parser):
@ -142,6 +185,59 @@ class Processor(TriplesStoreService):
help=f'FalkorDB database (default: {default_database})'
)
async def on_storage_management(self, message):
"""Handle storage management requests"""
logger.info(f"Storage management request: {message.operation} for {message.user}/{message.collection}")
try:
if message.operation == "delete-collection":
await self.handle_delete_collection(message)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {message.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_delete_collection(self, message):
"""Delete the collection for FalkorDB triples"""
try:
# Delete all nodes and literals for this user/collection
node_result = self.io.query(
"MATCH (n:Node {user: $user, collection: $collection}) DETACH DELETE n",
params={"user": message.user, "collection": message.collection}
)
literal_result = self.io.query(
"MATCH (n:Literal {user: $user, collection: $collection}) DETACH DELETE n",
params={"user": message.user, "collection": message.collection}
)
logger.info(f"Deleted {node_result.nodes_deleted} nodes and {literal_result.nodes_deleted} literals for collection {message.user}/{message.collection}")
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {message.user}/{message.collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
raise
def run():
Processor.launch(default_ident, __doc__)

View file

@ -13,6 +13,10 @@ import logging
from neo4j import GraphDatabase
from .... base import TriplesStoreService
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import triples_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -49,6 +53,34 @@ class Processor(TriplesStoreService):
with self.io.session(database=self.db) as session:
self.create_indexes(session)
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=triples_storage_management_topic,
subscriber=f"{self.id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
def create_indexes(self, session):
# Race condition, index creation failure is ignored. Right thing
@ -285,6 +317,67 @@ class Processor(TriplesStoreService):
help=f'Memgraph database (default: {default_database})'
)
async def on_storage_management(self, message):
"""Handle storage management requests"""
logger.info(f"Storage management request: {message.operation} for {message.user}/{message.collection}")
try:
if message.operation == "delete-collection":
await self.handle_delete_collection(message)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {message.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_delete_collection(self, message):
"""Delete all data for a specific collection"""
try:
with self.io.session(database=self.db) as session:
# Delete all nodes for this user and collection
node_result = session.run(
"MATCH (n:Node {user: $user, collection: $collection}) "
"DETACH DELETE n",
user=message.user, collection=message.collection
)
nodes_deleted = node_result.consume().counters.nodes_deleted
# Delete all literals for this user and collection
literal_result = session.run(
"MATCH (n:Literal {user: $user, collection: $collection}) "
"DETACH DELETE n",
user=message.user, collection=message.collection
)
literals_deleted = literal_result.consume().counters.nodes_deleted
# Note: Relationships are automatically deleted with DETACH DELETE
logger.info(f"Deleted {nodes_deleted} nodes and {literals_deleted} literals for {message.user}/{message.collection}")
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {message.user}/{message.collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
raise
def run():
Processor.launch(default_ident, __doc__)

View file

@ -12,6 +12,10 @@ import logging
from neo4j import GraphDatabase
from .... base import TriplesStoreService
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import triples_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -49,6 +53,34 @@ class Processor(TriplesStoreService):
with self.io.session(database=self.db) as session:
self.create_indexes(session)
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=triples_storage_management_topic,
subscriber=f"{id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
def create_indexes(self, session):
# Race condition, index creation failure is ignored. Right thing
@ -236,6 +268,67 @@ class Processor(TriplesStoreService):
help=f'Neo4j database (default: {default_database})'
)
async def on_storage_management(self, message):
"""Handle storage management requests"""
logger.info(f"Storage management request: {message.operation} for {message.user}/{message.collection}")
try:
if message.operation == "delete-collection":
await self.handle_delete_collection(message)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {message.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_delete_collection(self, message):
"""Delete all data for a specific collection"""
try:
with self.io.session(database=self.db) as session:
# Delete all nodes for this user and collection
node_result = session.run(
"MATCH (n:Node {user: $user, collection: $collection}) "
"DETACH DELETE n",
user=message.user, collection=message.collection
)
nodes_deleted = node_result.consume().counters.nodes_deleted
# Delete all literals for this user and collection
literal_result = session.run(
"MATCH (n:Literal {user: $user, collection: $collection}) "
"DETACH DELETE n",
user=message.user, collection=message.collection
)
literals_deleted = literal_result.consume().counters.nodes_deleted
# Note: Relationships are automatically deleted with DETACH DELETE
logger.info(f"Deleted {nodes_deleted} nodes and {literals_deleted} literals for {message.user}/{message.collection}")
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {message.user}/{message.collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
raise
def run():
Processor.launch(default_ident, __doc__)

View file

@ -111,6 +111,21 @@ class LibraryTableStore:
);
""");
logger.debug("collections table...")
self.cassandra.execute("""
CREATE TABLE IF NOT EXISTS collections (
user text,
collection text,
name text,
description text,
tags set<text>,
created_at timestamp,
updated_at timestamp,
PRIMARY KEY (user, collection)
);
""");
logger.info("Cassandra schema OK.")
def prepare_statements(self):
@ -187,6 +202,43 @@ class LibraryTableStore:
LIMIT 1
""")
# Collection management statements
self.insert_collection_stmt = self.cassandra.prepare("""
INSERT INTO collections
(user, collection, name, description, tags, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""")
self.update_collection_stmt = self.cassandra.prepare("""
UPDATE collections
SET name = ?, description = ?, tags = ?, updated_at = ?
WHERE user = ? AND collection = ?
""")
self.get_collection_stmt = self.cassandra.prepare("""
SELECT collection, name, description, tags, created_at, updated_at
FROM collections
WHERE user = ? AND collection = ?
""")
self.list_collections_stmt = self.cassandra.prepare("""
SELECT collection, name, description, tags, created_at, updated_at
FROM collections
WHERE user = ?
""")
self.delete_collection_stmt = self.cassandra.prepare("""
DELETE FROM collections
WHERE user = ? AND collection = ?
""")
self.collection_exists_stmt = self.cassandra.prepare("""
SELECT collection
FROM collections
WHERE user = ? AND collection = ?
LIMIT 1
""")
self.list_processing_stmt = self.cassandra.prepare("""
SELECT
id, document_id, time, flow, collection, tags
@ -521,3 +573,113 @@ class LibraryTableStore:
return lst
# Collection management methods
async def ensure_collection_exists(self, user, collection):
"""Ensure collection metadata record exists, create if not"""
try:
resp = await asyncio.get_event_loop().run_in_executor(
None, self.cassandra.execute, self.collection_exists_stmt, [user, collection]
)
if resp:
return
import datetime
now = datetime.datetime.now()
await asyncio.get_event_loop().run_in_executor(
None, self.cassandra.execute, self.insert_collection_stmt,
[user, collection, collection, "", set(), now, now]
)
logger.debug(f"Created collection metadata for {user}/{collection}")
except Exception as e:
logger.error(f"Error ensuring collection exists: {e}")
raise
async def list_collections(self, user, tag_filter=None):
"""List collections for a user, optionally filtered by tags"""
try:
resp = await asyncio.get_event_loop().run_in_executor(
None, self.cassandra.execute, self.list_collections_stmt, [user]
)
collections = []
for row in resp:
collection_data = {
"user": user,
"collection": row[0],
"name": row[1] or row[0],
"description": row[2] or "",
"tags": list(row[3]) if row[3] else [],
"created_at": row[4].isoformat() if row[4] else "",
"updated_at": row[5].isoformat() if row[5] else ""
}
if tag_filter:
collection_tags = set(collection_data["tags"])
filter_tags = set(tag_filter)
if not filter_tags.intersection(collection_tags):
continue
collections.append(collection_data)
return collections
except Exception as e:
logger.error(f"Error listing collections: {e}")
raise
async def update_collection(self, user, collection, name=None, description=None, tags=None):
"""Update collection metadata"""
try:
resp = await asyncio.get_event_loop().run_in_executor(
None, self.cassandra.execute, self.get_collection_stmt, [user, collection]
)
if not resp:
raise RequestError(f"Collection {collection} not found")
row = resp.one()
current_name = row[1] or collection
current_description = row[2] or ""
current_tags = set(row[3]) if row[3] else set()
new_name = name if name is not None else current_name
new_description = description if description is not None else current_description
new_tags = set(tags) if tags is not None else current_tags
import datetime
now = datetime.datetime.now()
await asyncio.get_event_loop().run_in_executor(
None, self.cassandra.execute, self.update_collection_stmt,
[new_name, new_description, new_tags, now, user, collection]
)
return {
"user": user, "collection": collection, "name": new_name,
"description": new_description, "tags": list(new_tags),
"updated_at": now.isoformat()
}
except Exception as e:
logger.error(f"Error updating collection: {e}")
raise
async def delete_collection_metadata(self, user, collection):
"""Delete collection metadata record"""
try:
await asyncio.get_event_loop().run_in_executor(
None, self.cassandra.execute, self.delete_collection_stmt, [user, collection]
)
logger.debug(f"Deleted collection metadata for {user}/{collection}")
except Exception as e:
logger.error(f"Error deleting collection metadata: {e}")
raise
async def get_collection(self, user, collection):
"""Get collection metadata"""
try:
resp = await asyncio.get_event_loop().run_in_executor(
None, self.cassandra.execute, self.get_collection_stmt, [user, collection]
)
if not resp:
return None
row = resp.one()
return {
"user": user, "collection": row[0], "name": row[1] or row[0],
"description": row[2] or "", "tags": list(row[3]) if row[3] else [],
"created_at": row[4].isoformat() if row[4] else "",
"updated_at": row[5].isoformat() if row[5] else ""
}
except Exception as e:
logger.error(f"Error getting collection: {e}")
raise