Basic multitenant support (#583)

* Tech spec

* Address multi-tenant queue option problems in CLI

* Modified collection service to use config

* Changed storage management to use the config service definition
This commit is contained in:
cybermaggedon 2025-12-05 21:45:30 +00:00 committed by GitHub
parent 789d9713a0
commit 7d07f802a8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 1416 additions and 1731 deletions

View file

@ -26,9 +26,6 @@ from ... base import Consumer, Producer
# Module logger
logger = logging.getLogger(__name__)
# FIXME: How to ensure this doesn't conflict with other usage?
keyspace = "config"
default_ident = "config-svc"
default_config_request_queue = config_request_queue
@ -64,12 +61,13 @@ class Processor(AsyncProcessor):
cassandra_host = params.get("cassandra_host")
cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password = resolve_cassandra_config(
hosts, username, password, keyspace = resolve_cassandra_config(
host=cassandra_host,
username=cassandra_username,
password=cassandra_password
password=cassandra_password,
default_keyspace="config"
)
# Store resolved configuration
@ -273,7 +271,7 @@ class Processor(AsyncProcessor):
)
parser.add_argument(
'--push-queue',
'--config-push-queue',
default=default_config_push_queue,
help=f'Config push queue (default: {default_config_push_queue})'
)

View file

@ -33,9 +33,6 @@ default_knowledge_response_queue = knowledge_response_queue
default_cassandra_host = "cassandra"
# FIXME: How to ensure this doesn't conflict with other usage?
keyspace = "knowledge"
class Processor(AsyncProcessor):
def __init__(self, **params):
@ -53,14 +50,15 @@ class Processor(AsyncProcessor):
cassandra_host = params.get("cassandra_host")
cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password = resolve_cassandra_config(
hosts, username, password, keyspace = resolve_cassandra_config(
host=cassandra_host,
username=cassandra_username,
password=cassandra_password
password=cassandra_password,
default_keyspace="knowledge"
)
# Store resolved configuration
self.cassandra_host = hosts
self.cassandra_username = username

View file

@ -1,142 +1,130 @@
"""
Collection management for the librarian
Collection management for the librarian - uses config service for storage
"""
import asyncio
import logging
import json
import uuid
from datetime import datetime
from typing import Dict, Any, List, Optional
from .. schema import CollectionManagementRequest, CollectionManagementResponse, Error
from .. schema import CollectionMetadata
from .. schema import StorageManagementRequest, StorageManagementResponse
from .. schema import ConfigRequest, ConfigResponse
from .. exceptions import RequestError
from .. tables.library import LibraryTableStore
# Module logger
logger = logging.getLogger(__name__)
class CollectionManager:
"""Manages collection metadata and coordinates collection operations across storage types"""
"""Manages collection metadata via config service"""
def __init__(
self,
cassandra_host,
cassandra_username,
cassandra_password,
keyspace,
vector_storage_producer=None,
object_storage_producer=None,
triples_storage_producer=None,
storage_response_consumer=None
config_request_producer,
config_response_consumer,
taskgroup
):
"""
Initialize the CollectionManager
Args:
cassandra_host: Cassandra host(s)
cassandra_username: Cassandra username
cassandra_password: Cassandra password
keyspace: Cassandra keyspace for library data
vector_storage_producer: Producer for vector storage management
object_storage_producer: Producer for object storage management
triples_storage_producer: Producer for triples storage management
storage_response_consumer: Consumer for storage management responses
config_request_producer: Producer for config service requests
config_response_consumer: Consumer for config service responses
taskgroup: Task group for async operations
"""
self.table_store = LibraryTableStore(
cassandra_host, cassandra_username, cassandra_password, keyspace
)
self.config_request_producer = config_request_producer
self.config_response_consumer = config_response_consumer
self.taskgroup = taskgroup
# Storage management producers
self.vector_storage_producer = vector_storage_producer
self.object_storage_producer = object_storage_producer
self.triples_storage_producer = triples_storage_producer
self.storage_response_consumer = storage_response_consumer
# Track pending config requests
self.pending_config_requests = {}
# Track pending deletion operations
self.pending_deletions = {}
logger.info("Collection manager initialized with config service backend")
logger.info("Collection manager initialized")
async def send_config_request(self, request: ConfigRequest) -> ConfigResponse:
"""
Send config request and wait for response
Args:
request: Config service request
Returns:
ConfigResponse from config service
"""
event = asyncio.Event()
self.pending_config_requests[request.id] = event
await self.config_request_producer.send(request)
await event.wait()
response = self.pending_config_requests.pop(request.id + "_response")
return response
async def on_config_response(self, message, consumer, flow):
"""
Handle config response
Args:
message: Pulsar message
consumer: Consumer instance
flow: Flow context
"""
response = message.value()
if response.id in self.pending_config_requests:
self.pending_config_requests[response.id + "_response"] = response
self.pending_config_requests[response.id].set()
async def ensure_collection_exists(self, user: str, collection: str):
"""
Ensure a collection exists, creating it if necessary with broadcast to storage
Ensure a collection exists, creating it if necessary
Args:
user: User ID
collection: Collection ID
"""
try:
# Check if collection already exists
existing = await self.table_store.get_collection(user, collection)
if existing:
# Check if collection exists via config service
request = ConfigRequest(
id=str(uuid.uuid4()),
operation='get',
type='collection',
keys=[f'{user}:{collection}']
)
response = await self.send_config_request(request)
# If collection exists, we're done
if response.values and len(response.values) > 0:
logger.debug(f"Collection {user}/{collection} already exists")
return
# Create new collection with default metadata
logger.info(f"Auto-creating collection {user}/{collection} from document submission")
await self.table_store.create_collection(
logger.info(f"Auto-creating collection {user}/{collection}")
metadata = CollectionMetadata(
user=user,
collection=collection,
name=collection, # Default name to collection ID
description="",
tags=set()
tags=[]
)
# Broadcast collection creation to all storage backends
creation_key = (user, collection)
logger.info(f"Broadcasting create-collection for {creation_key}")
self.pending_deletions[creation_key] = {
"responses_pending": 4, # doc-embeddings, graph-embeddings, object, triples
"responses_received": [],
"all_successful": True,
"error_messages": [],
"deletion_complete": asyncio.Event()
}
storage_request = StorageManagementRequest(
operation="create-collection",
user=user,
collection=collection
request = ConfigRequest(
id=str(uuid.uuid4()),
operation='put',
type='collection',
key=f'{user}:{collection}',
value=json.dumps(metadata.to_dict())
)
# Send creation requests to all storage types
if self.vector_storage_producer:
await self.vector_storage_producer.send(storage_request)
if self.object_storage_producer:
await self.object_storage_producer.send(storage_request)
if self.triples_storage_producer:
await self.triples_storage_producer.send(storage_request)
response = await self.send_config_request(request)
# Wait for all storage creations to complete (with timeout)
creation_info = self.pending_deletions[creation_key]
try:
await asyncio.wait_for(
creation_info["deletion_complete"].wait(),
timeout=30.0 # 30 second timeout
)
except asyncio.TimeoutError:
logger.error(f"Timeout waiting for storage creation responses for {creation_key}")
creation_info["all_successful"] = False
creation_info["error_messages"].append("Timeout waiting for storage creation")
if response.error:
raise RuntimeError(f"Config update failed: {response.error.message}")
# Check if all creations succeeded
if not creation_info["all_successful"]:
error_msg = f"Storage creation failed: {'; '.join(creation_info['error_messages'])}"
logger.error(error_msg)
# Clean up metadata on failure
await self.table_store.delete_collection(user, collection)
# Clean up tracking
del self.pending_deletions[creation_key]
raise RuntimeError(error_msg)
# Clean up tracking
del self.pending_deletions[creation_key]
logger.info(f"Collection {creation_key} auto-created successfully in all storage backends")
logger.info(f"Collection {user}/{collection} auto-created in config service")
except Exception as e:
logger.error(f"Error ensuring collection exists: {e}")
@ -144,7 +132,7 @@ class CollectionManager:
async def list_collections(self, request: CollectionManagementRequest) -> CollectionManagementResponse:
"""
List collections for a user with optional tag filtering
List collections for a user from config service
Args:
request: Collection management request
@ -153,25 +141,43 @@ class CollectionManager:
CollectionManagementResponse with list of collections
"""
try:
tag_filter = list(request.tag_filter) if request.tag_filter else None
collections = await self.table_store.list_collections(request.user, tag_filter)
# Get all collections from config service
config_request = ConfigRequest(
id=str(uuid.uuid4()),
operation='getvalues',
type='collection'
)
collection_metadata = [
CollectionMetadata(
user=coll["user"],
collection=coll["collection"],
name=coll["name"],
description=coll["description"],
tags=coll["tags"],
created_at=coll["created_at"],
updated_at=coll["updated_at"]
)
for coll in collections
]
response = await self.send_config_request(config_request)
if response.error:
raise RuntimeError(f"Config query failed: {response.error.message}")
# Parse collections and filter by user
collections = []
for key, value_json in response.values.items():
if ":" in key:
coll_user, coll_name = key.split(":", 1)
if coll_user == request.user:
metadata_dict = json.loads(value_json)
metadata = CollectionMetadata(**metadata_dict)
collections.append(metadata)
# Apply tag filtering if specified
if request.tag_filter:
tag_filter_set = set(request.tag_filter)
collections = [
c for c in collections
if any(tag in tag_filter_set for tag in c.tags)
]
# Apply limit if specified
if request.limit and request.limit > 0:
collections = collections[:request.limit]
return CollectionManagementResponse(
error=None,
collections=collection_metadata,
collections=collections,
timestamp=datetime.now().isoformat()
)
@ -181,7 +187,7 @@ class CollectionManager:
async def update_collection(self, request: CollectionManagementRequest) -> CollectionManagementResponse:
"""
Update collection metadata (creates if doesn't exist)
Update collection metadata via config service (creates if doesn't exist)
Args:
request: Collection management request
@ -190,120 +196,41 @@ class CollectionManager:
CollectionManagementResponse with updated collection
"""
try:
# Check if collection exists, create if it doesn't
existing = await self.table_store.get_collection(request.user, request.collection)
if not existing:
# Create new collection with provided metadata
logger.info(f"Creating new collection {request.user}/{request.collection}")
# Create metadata from request
name = request.name if request.name else request.collection
description = request.description if request.description else ""
tags = list(request.tags) if request.tags else []
name = request.name if request.name else request.collection
description = request.description if request.description else ""
tags = set(request.tags) if request.tags else set()
metadata = CollectionMetadata(
user=request.user,
collection=request.collection,
name=name,
description=description,
tags=tags
)
await self.table_store.create_collection(
user=request.user,
collection=request.collection,
name=name,
description=description,
tags=tags
)
# Send put request to config service
config_request = ConfigRequest(
id=str(uuid.uuid4()),
operation='put',
type='collection',
key=f'{request.user}:{request.collection}',
value=json.dumps(metadata.to_dict())
)
# Broadcast collection creation to all storage backends
creation_key = (request.user, request.collection)
logger.info(f"Broadcasting create-collection for {creation_key}")
response = await self.send_config_request(config_request)
self.pending_deletions[creation_key] = {
"responses_pending": 4, # doc-embeddings, graph-embeddings, object, triples
"responses_received": [],
"all_successful": True,
"error_messages": [],
"deletion_complete": asyncio.Event()
}
if response.error:
raise RuntimeError(f"Config update failed: {response.error.message}")
storage_request = StorageManagementRequest(
operation="create-collection",
user=request.user,
collection=request.collection
)
logger.info(f"Collection {request.user}/{request.collection} updated in config service")
# Send creation requests to all storage types
if self.vector_storage_producer:
await self.vector_storage_producer.send(storage_request)
if self.object_storage_producer:
await self.object_storage_producer.send(storage_request)
if self.triples_storage_producer:
await self.triples_storage_producer.send(storage_request)
# Wait for all storage creations to complete (with timeout)
creation_info = self.pending_deletions[creation_key]
try:
await asyncio.wait_for(
creation_info["deletion_complete"].wait(),
timeout=30.0 # 30 second timeout
)
except asyncio.TimeoutError:
logger.error(f"Timeout waiting for storage creation responses for {creation_key}")
creation_info["all_successful"] = False
creation_info["error_messages"].append("Timeout waiting for storage creation")
# Check if all creations succeeded
if not creation_info["all_successful"]:
error_msg = f"Storage creation failed: {'; '.join(creation_info['error_messages'])}"
logger.error(error_msg)
# Clean up metadata on failure
await self.table_store.delete_collection(request.user, request.collection)
# Clean up tracking
del self.pending_deletions[creation_key]
return CollectionManagementResponse(
error=Error(
type="storage_creation_error",
message=error_msg
),
timestamp=datetime.now().isoformat()
)
# Clean up tracking
del self.pending_deletions[creation_key]
logger.info(f"Collection {creation_key} created successfully in all storage backends")
# Get the newly created collection for response
created_collection = await self.table_store.get_collection(request.user, request.collection)
collection_metadata = CollectionMetadata(
user=created_collection["user"],
collection=created_collection["collection"],
name=created_collection["name"],
description=created_collection["description"],
tags=created_collection["tags"],
created_at=created_collection["created_at"],
updated_at=created_collection["updated_at"]
)
else:
# Collection exists, update it
name = request.name if request.name else None
description = request.description if request.description else None
tags = list(request.tags) if request.tags else None
updated_collection = await self.table_store.update_collection(
request.user, request.collection, name, description, tags
)
collection_metadata = CollectionMetadata(
user=updated_collection["user"],
collection=updated_collection["collection"],
name=updated_collection["name"],
description=updated_collection["description"],
tags=updated_collection["tags"],
created_at="", # Not returned by update
updated_at=updated_collection["updated_at"]
)
# Config service will trigger config push automatically
# Storage services will receive update and create/update collections
return CollectionManagementResponse(
error=None,
collections=[collection_metadata],
collections=[metadata],
timestamp=datetime.now().isoformat()
)
@ -313,7 +240,7 @@ class CollectionManager:
async def delete_collection(self, request: CollectionManagementRequest) -> CollectionManagementResponse:
"""
Delete collection with cascade to all storage types
Delete collection via config service
Args:
request: Collection management request
@ -322,68 +249,25 @@ class CollectionManager:
CollectionManagementResponse indicating success or failure
"""
try:
deletion_key = (request.user, request.collection)
logger.info(f"Deleting collection {request.user}/{request.collection}")
logger.info(f"Starting cascade deletion for {request.user}/{request.collection}")
# Track this deletion request
self.pending_deletions[deletion_key] = {
"responses_pending": 4, # doc-embeddings, graph-embeddings, object, triples
"responses_received": [],
"all_successful": True,
"error_messages": [],
"deletion_complete": asyncio.Event()
}
# Create storage management request
storage_request = StorageManagementRequest(
operation="delete-collection",
user=request.user,
collection=request.collection
# Send delete request to config service
config_request = ConfigRequest(
id=str(uuid.uuid4()),
operation='delete',
type='collection',
key=f'{request.user}:{request.collection}'
)
# Send deletion requests to all storage types
if self.vector_storage_producer:
await self.vector_storage_producer.send(storage_request)
if self.object_storage_producer:
await self.object_storage_producer.send(storage_request)
if self.triples_storage_producer:
await self.triples_storage_producer.send(storage_request)
response = await self.send_config_request(config_request)
# Wait for all storage deletions to complete (with timeout)
deletion_info = self.pending_deletions[deletion_key]
try:
await asyncio.wait_for(
deletion_info["deletion_complete"].wait(),
timeout=30.0 # 30 second timeout
)
except asyncio.TimeoutError:
logger.error(f"Timeout waiting for storage deletion responses for {deletion_key}")
deletion_info["all_successful"] = False
deletion_info["error_messages"].append("Timeout waiting for storage deletion")
if response.error:
raise RuntimeError(f"Config delete failed: {response.error.message}")
# Check if all deletions succeeded
if not deletion_info["all_successful"]:
error_msg = f"Storage deletion failed: {'; '.join(deletion_info['error_messages'])}"
logger.error(error_msg)
logger.info(f"Collection {request.user}/{request.collection} deleted from config service")
# Clean up tracking
del self.pending_deletions[deletion_key]
return CollectionManagementResponse(
error=Error(
type="storage_deletion_error",
message=error_msg
),
timestamp=datetime.now().isoformat()
)
# All storage deletions succeeded, now delete metadata
logger.info(f"Storage deletions complete, removing metadata for {deletion_key}")
await self.table_store.delete_collection(request.user, request.collection)
# Clean up tracking
del self.pending_deletions[deletion_key]
# Config service will trigger config push automatically
# Storage services will receive update and delete collections
return CollectionManagementResponse(
error=None,
@ -392,39 +276,4 @@ class CollectionManager:
except Exception as e:
logger.error(f"Error deleting collection: {e}")
# Clean up tracking on error
if deletion_key in self.pending_deletions:
del self.pending_deletions[deletion_key]
raise RequestError(f"Failed to delete collection: {str(e)}")
async def on_storage_response(self, response: StorageManagementResponse):
"""
Handle storage management responses for deletion tracking
Args:
response: Storage management response
"""
logger.debug(f"Received storage response: error={response.error}")
# Find matching deletion by checking all pending deletions
# Note: This is simplified correlation - in production we'd want better correlation
for deletion_key, info in list(self.pending_deletions.items()):
if info["responses_pending"] > 0:
# Record this response
info["responses_received"].append(response)
info["responses_pending"] -= 1
# Check if this response indicates failure
if response.error and response.error.message:
info["all_successful"] = False
info["error_messages"].append(response.error.message)
logger.warning(f"Storage operation failed for {deletion_key}: {response.error.message}")
else:
logger.debug(f"Storage operation succeeded for {deletion_key}")
# If all responses received, signal completion
if info["responses_pending"] == 0:
logger.info(f"All storage responses received for {deletion_key}")
info["deletion_complete"].set()
break # Only process for first matching deletion

View file

@ -18,9 +18,8 @@ from .. schema import LibrarianRequest, LibrarianResponse, Error
from .. schema import librarian_request_queue, librarian_response_queue
from .. schema import CollectionManagementRequest, CollectionManagementResponse
from .. schema import collection_request_queue, collection_response_queue
from .. schema import StorageManagementRequest, StorageManagementResponse
from .. schema import vector_storage_management_topic, object_storage_management_topic
from .. schema import triples_storage_management_topic, storage_management_response_topic
from .. schema import ConfigRequest, ConfigResponse
from .. schema import config_request_queue, config_response_queue
from .. schema import Document, Metadata
from .. schema import TextDocument, Metadata
@ -39,6 +38,8 @@ default_librarian_request_queue = librarian_request_queue
default_librarian_response_queue = librarian_response_queue
default_collection_request_queue = collection_request_queue
default_collection_response_queue = collection_response_queue
default_config_request_queue = config_request_queue
default_config_response_queue = config_response_queue
default_minio_host = "minio:9000"
default_minio_access_key = "minioadmin"
@ -47,9 +48,6 @@ default_cassandra_host = "cassandra"
bucket_name = "library"
# FIXME: How to ensure this doesn't conflict with other usage?
keyspace = "librarian"
class Processor(AsyncProcessor):
def __init__(self, **params):
@ -74,6 +72,14 @@ class Processor(AsyncProcessor):
"collection_response_queue", default_collection_response_queue
)
config_request_queue = params.get(
"config_request_queue", default_config_request_queue
)
config_response_queue = params.get(
"config_response_queue", default_config_response_queue
)
minio_host = params.get("minio_host", default_minio_host)
minio_access_key = params.get(
"minio_access_key",
@ -87,14 +93,15 @@ class Processor(AsyncProcessor):
cassandra_host = params.get("cassandra_host")
cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password = resolve_cassandra_config(
hosts, username, password, keyspace = resolve_cassandra_config(
host=cassandra_host,
username=cassandra_username,
password=cassandra_password
password=cassandra_password,
default_keyspace="librarian"
)
# Store resolved configuration
self.cassandra_host = hosts
self.cassandra_username = username
@ -170,34 +177,31 @@ class Processor(AsyncProcessor):
metrics = collection_response_metrics,
)
# Storage management producers for collection deletion
self.vector_storage_producer = Producer(
client = self.pulsar_client,
topic = vector_storage_management_topic,
schema = StorageManagementRequest,
# Config service client for collection management
config_request_metrics = ProducerMetrics(
processor = id, flow = None, name = "config-request"
)
self.object_storage_producer = Producer(
self.config_request_producer = Producer(
client = self.pulsar_client,
topic = object_storage_management_topic,
schema = StorageManagementRequest,
topic = config_request_queue,
schema = ConfigRequest,
metrics = config_request_metrics,
)
self.triples_storage_producer = Producer(
client = self.pulsar_client,
topic = triples_storage_management_topic,
schema = StorageManagementRequest,
config_response_metrics = ConsumerMetrics(
processor = id, flow = None, name = "config-response"
)
self.storage_response_consumer = Consumer(
self.config_response_consumer = Consumer(
taskgroup = self.taskgroup,
client = self.pulsar_client,
flow = None,
topic = storage_management_response_topic,
subscriber = id,
schema = StorageManagementResponse,
handler = self.on_storage_response,
metrics = storage_response_metrics,
topic = config_response_queue,
subscriber = f"{id}-config",
schema = ConfigResponse,
handler = self.on_config_response,
metrics = config_response_metrics,
)
self.librarian = Librarian(
@ -213,14 +217,9 @@ class Processor(AsyncProcessor):
)
self.collection_manager = CollectionManager(
cassandra_host = self.cassandra_host,
cassandra_username = self.cassandra_username,
cassandra_password = self.cassandra_password,
keyspace = keyspace,
vector_storage_producer = self.vector_storage_producer,
object_storage_producer = self.object_storage_producer,
triples_storage_producer = self.triples_storage_producer,
storage_response_consumer = self.storage_response_consumer,
config_request_producer = self.config_request_producer,
config_response_consumer = self.config_response_consumer,
taskgroup = self.taskgroup,
)
self.register_config_handler(self.on_librarian_config)
@ -236,10 +235,12 @@ class Processor(AsyncProcessor):
await self.librarian_response_producer.start()
await self.collection_request_consumer.start()
await self.collection_response_producer.start()
await self.vector_storage_producer.start()
await self.object_storage_producer.start()
await self.triples_storage_producer.start()
await self.storage_response_consumer.start()
await self.config_request_producer.start()
await self.config_response_consumer.start()
async def on_config_response(self, message, consumer, flow):
"""Forward config responses to collection manager"""
await self.collection_manager.on_config_response(message, consumer, flow)
async def on_librarian_config(self, config, version):
@ -464,14 +465,6 @@ class Processor(AsyncProcessor):
logger.debug("Collection request processing complete")
async def on_storage_response(self, msg, consumer, flow):
"""
Handle storage management response messages
"""
v = msg.value()
logger.debug("Received storage management response")
await self.collection_manager.on_storage_response(v)
@staticmethod
def add_args(parser):

View file

@ -28,7 +28,7 @@ class Processor(TriplesQueryService):
cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password = resolve_cassandra_config(
hosts, username, password, keyspace = resolve_cassandra_config(
host=cassandra_host,
username=cassandra_username,
password=cassandra_password

View file

@ -6,11 +6,9 @@ Accepts entity/vector pairs and writes them to a Milvus store.
import logging
from .... direct.milvus_doc_embeddings import DocVectors
from .... base import DocumentEmbeddingsStoreService
from .... base import DocumentEmbeddingsStoreService, CollectionConfigHandler
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import vector_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -18,7 +16,7 @@ logger = logging.getLogger(__name__)
default_ident = "de-write"
default_store_uri = 'http://localhost:19530'
class Processor(DocumentEmbeddingsStoreService):
class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
def __init__(self, **params):
@ -32,51 +30,11 @@ class Processor(DocumentEmbeddingsStoreService):
self.vecstore = DocVectors(store_uri)
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=vector_storage_management_topic,
subscriber=f"{self.id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
async def start(self):
"""Start the processor and its storage management consumer"""
await super().start()
await self.storage_request_consumer.start()
await self.storage_response_producer.start()
# Register for config push notifications
self.register_config_handler(self.on_collection_config)
async def store_document_embeddings(self, message):
# Validate collection exists before accepting writes
if not self.vecstore.collection_exists(message.metadata.user, message.metadata.collection):
error_msg = (
f"Collection {message.metadata.collection} does not exist. "
f"Create it first with tg-set-collection."
)
logger.error(error_msg)
raise ValueError(error_msg)
for emb in message.chunks:
if emb.chunk is None or emb.chunk == b"": continue
@ -102,72 +60,27 @@ class Processor(DocumentEmbeddingsStoreService):
help=f'Milvus store URI (default: {default_store_uri})'
)
async def on_storage_management(self, message, consumer, flow):
"""Handle storage management requests"""
request = message.value()
logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}")
try:
if request.operation == "create-collection":
await self.handle_create_collection(request)
elif request.operation == "delete-collection":
await self.handle_delete_collection(request)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {request.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_create_collection(self, request):
async def create_collection(self, user: str, collection: str, metadata: dict):
"""
No-op for collection creation - collections are created lazily on first write
Create collection via config push - collections are created lazily on first write
with the correct dimension determined from the actual embeddings.
"""
try:
logger.info(f"Collection create request for {request.user}/{request.collection} - will be created lazily on first write")
self.vecstore.create_collection(request.user, request.collection)
# Send success response
response = StorageManagementResponse(error=None)
await self.storage_response_producer.send(response)
logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write")
self.vecstore.create_collection(user, collection)
except Exception as e:
logger.error(f"Failed to handle create collection request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="creation_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
raise
async def handle_delete_collection(self, request):
"""Delete the collection for document embeddings"""
async def delete_collection(self, user: str, collection: str):
"""Delete the collection for document embeddings via config push"""
try:
self.vecstore.delete_collection(request.user, request.collection)
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {request.user}/{request.collection}")
self.vecstore.delete_collection(user, collection)
logger.info(f"Successfully deleted collection {user}/{collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
raise
def run():

View file

@ -11,11 +11,9 @@ import uuid
import os
import logging
from .... base import DocumentEmbeddingsStoreService
from .... base import DocumentEmbeddingsStoreService, CollectionConfigHandler
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import vector_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -25,7 +23,7 @@ default_api_key = os.getenv("PINECONE_API_KEY", "not-specified")
default_cloud = "aws"
default_region = "us-east-1"
class Processor(DocumentEmbeddingsStoreService):
class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
def __init__(self, **params):
@ -59,33 +57,8 @@ class Processor(DocumentEmbeddingsStoreService):
self.last_index_name = None
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=vector_storage_management_topic,
subscriber=f"{self.id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
# Register for config push notifications
self.register_config_handler(self.on_collection_config)
def create_index(self, index_name, dim):
@ -115,12 +88,6 @@ class Processor(DocumentEmbeddingsStoreService):
"Gave up waiting for index creation"
)
async def start(self):
"""Start the processor and its storage management consumer"""
await super().start()
await self.storage_request_consumer.start()
await self.storage_response_producer.start()
async def store_document_embeddings(self, message):
for emb in message.chunks:
@ -188,65 +155,22 @@ class Processor(DocumentEmbeddingsStoreService):
help=f'Pinecone region, (default: {default_region}'
)
async def on_storage_management(self, message, consumer, flow):
"""Handle storage management requests"""
request = message.value()
logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}")
try:
if request.operation == "create-collection":
await self.handle_create_collection(request)
elif request.operation == "delete-collection":
await self.handle_delete_collection(request)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {request.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_create_collection(self, request):
async def create_collection(self, user: str, collection: str, metadata: dict):
"""
No-op for collection creation - indexes are created lazily on first write
Create collection via config push - indexes are created lazily on first write
with the correct dimension determined from the actual embeddings.
"""
try:
logger.info(f"Collection create request for {request.user}/{request.collection} - will be created lazily on first write")
# Send success response
response = StorageManagementResponse(error=None)
await self.storage_response_producer.send(response)
logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write")
except Exception as e:
logger.error(f"Failed to handle create collection request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="creation_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
raise
async def handle_delete_collection(self, request):
"""
Delete all dimension variants of the index for document embeddings.
Since indexes are created with dimension suffixes (e.g., d-user-coll-384),
we need to find and delete all matching indexes.
"""
async def delete_collection(self, user: str, collection: str):
"""Delete the collection for document embeddings via config push"""
try:
prefix = f"d-{request.user}-{request.collection}-"
prefix = f"d-{user}-{collection}-"
# Get all indexes and filter for matches
all_indexes = self.pinecone.list_indexes()
@ -261,16 +185,10 @@ class Processor(DocumentEmbeddingsStoreService):
for index_name in matching_indexes:
self.pinecone.delete_index(index_name)
logger.info(f"Deleted Pinecone index: {index_name}")
logger.info(f"Deleted {len(matching_indexes)} index(es) for {request.user}/{request.collection}")
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Deleted {len(matching_indexes)} index(es) for {user}/{collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
raise
def run():

View file

@ -9,11 +9,9 @@ from qdrant_client.models import Distance, VectorParams
import uuid
import logging
from .... base import DocumentEmbeddingsStoreService
from .... base import DocumentEmbeddingsStoreService, CollectionConfigHandler
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import vector_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -22,7 +20,7 @@ default_ident = "de-write"
default_store_uri = 'http://localhost:6333'
class Processor(DocumentEmbeddingsStoreService):
class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
def __init__(self, **params):
@ -38,44 +36,8 @@ class Processor(DocumentEmbeddingsStoreService):
self.qdrant = QdrantClient(url=store_uri, api_key=api_key)
# Set up storage management if base class attributes are available
# (they may not be in unit tests)
if hasattr(self, 'id') and hasattr(self, 'taskgroup') and hasattr(self, 'pulsar_client'):
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=vector_storage_management_topic,
subscriber=f"{self.id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
async def start(self):
"""Start the processor and its storage management consumer"""
await super().start()
if hasattr(self, 'storage_request_consumer'):
await self.storage_request_consumer.start()
if hasattr(self, 'storage_response_producer'):
await self.storage_response_producer.start()
# Register for config push notifications
self.register_config_handler(self.on_collection_config)
async def store_document_embeddings(self, message):
@ -133,65 +95,22 @@ class Processor(DocumentEmbeddingsStoreService):
help=f'Qdrant API key (default: None)'
)
async def on_storage_management(self, message, consumer, flow):
"""Handle storage management requests"""
request = message.value()
logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}")
try:
if request.operation == "create-collection":
await self.handle_create_collection(request)
elif request.operation == "delete-collection":
await self.handle_delete_collection(request)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {request.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_create_collection(self, request):
async def create_collection(self, user: str, collection: str, metadata: dict):
"""
No-op for collection creation - collections are created lazily on first write
Create collection via config push - collections are created lazily on first write
with the correct dimension determined from the actual embeddings.
"""
try:
logger.info(f"Collection create request for {request.user}/{request.collection} - will be created lazily on first write")
# Send success response
response = StorageManagementResponse(error=None)
await self.storage_response_producer.send(response)
logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write")
except Exception as e:
logger.error(f"Failed to handle create collection request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="creation_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
raise
async def handle_delete_collection(self, request):
"""
Delete all dimension variants of the collection for document embeddings.
Since collections are created with dimension suffixes (e.g., d_user_coll_384),
we need to find and delete all matching collections.
"""
async def delete_collection(self, user: str, collection: str):
"""Delete the collection for document embeddings via config push"""
try:
prefix = f"d_{request.user}_{request.collection}_"
prefix = f"d_{user}_{collection}_"
# Get all collections and filter for matches
all_collections = self.qdrant.get_collections().collections
@ -206,16 +125,10 @@ class Processor(DocumentEmbeddingsStoreService):
for collection_name in matching_collections:
self.qdrant.delete_collection(collection_name)
logger.info(f"Deleted Qdrant collection: {collection_name}")
logger.info(f"Deleted {len(matching_collections)} collection(s) for {request.user}/{request.collection}")
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Deleted {len(matching_collections)} collection(s) for {user}/{collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
raise
def run():

View file

@ -6,11 +6,9 @@ Accepts entity/vector pairs and writes them to a Milvus store.
import logging
from .... direct.milvus_graph_embeddings import EntityVectors
from .... base import GraphEmbeddingsStoreService
from .... base import GraphEmbeddingsStoreService, CollectionConfigHandler
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import vector_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -18,7 +16,7 @@ logger = logging.getLogger(__name__)
default_ident = "ge-write"
default_store_uri = 'http://localhost:19530'
class Processor(GraphEmbeddingsStoreService):
class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
def __init__(self, **params):
@ -32,51 +30,11 @@ class Processor(GraphEmbeddingsStoreService):
self.vecstore = EntityVectors(store_uri)
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=vector_storage_management_topic,
subscriber=f"{self.id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
async def start(self):
"""Start the processor and its storage management consumer"""
await super().start()
await self.storage_request_consumer.start()
await self.storage_response_producer.start()
# Register for config push notifications
self.register_config_handler(self.on_collection_config)
async def store_graph_embeddings(self, message):
# Validate collection exists before accepting writes
if not self.vecstore.collection_exists(message.metadata.user, message.metadata.collection):
error_msg = (
f"Collection {message.metadata.collection} does not exist. "
f"Create it first with tg-set-collection."
)
logger.error(error_msg)
raise ValueError(error_msg)
for entity in message.entities:
if entity.entity.value != "" and entity.entity.value is not None:
@ -98,72 +56,27 @@ class Processor(GraphEmbeddingsStoreService):
help=f'Milvus store URI (default: {default_store_uri})'
)
async def on_storage_management(self, message, consumer, flow):
"""Handle storage management requests"""
request = message.value()
logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}")
try:
if request.operation == "create-collection":
await self.handle_create_collection(request)
elif request.operation == "delete-collection":
await self.handle_delete_collection(request)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {request.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_create_collection(self, request):
async def create_collection(self, user: str, collection: str, metadata: dict):
"""
No-op for collection creation - collections are created lazily on first write
Create collection via config push - collections are created lazily on first write
with the correct dimension determined from the actual embeddings.
"""
try:
logger.info(f"Collection create request for {request.user}/{request.collection} - will be created lazily on first write")
self.vecstore.create_collection(request.user, request.collection)
# Send success response
response = StorageManagementResponse(error=None)
await self.storage_response_producer.send(response)
logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write")
self.vecstore.create_collection(user, collection)
except Exception as e:
logger.error(f"Failed to handle create collection request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="creation_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
raise
async def handle_delete_collection(self, request):
"""Delete the collection for graph embeddings"""
async def delete_collection(self, user: str, collection: str):
"""Delete the collection for graph embeddings via config push"""
try:
self.vecstore.delete_collection(request.user, request.collection)
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {request.user}/{request.collection}")
self.vecstore.delete_collection(user, collection)
logger.info(f"Successfully deleted collection {user}/{collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
raise
def run():

View file

@ -11,11 +11,9 @@ import uuid
import os
import logging
from .... base import GraphEmbeddingsStoreService
from .... base import GraphEmbeddingsStoreService, CollectionConfigHandler
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import vector_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -25,7 +23,7 @@ default_api_key = os.getenv("PINECONE_API_KEY", "not-specified")
default_cloud = "aws"
default_region = "us-east-1"
class Processor(GraphEmbeddingsStoreService):
class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
def __init__(self, **params):
@ -59,33 +57,8 @@ class Processor(GraphEmbeddingsStoreService):
self.last_index_name = None
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=vector_storage_management_topic,
subscriber=f"{self.id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
# Register for config push notifications
self.register_config_handler(self.on_collection_config)
def create_index(self, index_name, dim):
@ -115,12 +88,6 @@ class Processor(GraphEmbeddingsStoreService):
"Gave up waiting for index creation"
)
async def start(self):
"""Start the processor and its storage management consumer"""
await super().start()
await self.storage_request_consumer.start()
await self.storage_response_producer.start()
async def store_graph_embeddings(self, message):
for entity in message.entities:
@ -186,65 +153,22 @@ class Processor(GraphEmbeddingsStoreService):
help=f'Pinecone region, (default: {default_region}'
)
async def on_storage_management(self, message, consumer, flow):
"""Handle storage management requests"""
request = message.value()
logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}")
try:
if request.operation == "create-collection":
await self.handle_create_collection(request)
elif request.operation == "delete-collection":
await self.handle_delete_collection(request)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {request.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_create_collection(self, request):
async def create_collection(self, user: str, collection: str, metadata: dict):
"""
No-op for collection creation - indexes are created lazily on first write
Create collection via config push - indexes are created lazily on first write
with the correct dimension determined from the actual embeddings.
"""
try:
logger.info(f"Collection create request for {request.user}/{request.collection} - will be created lazily on first write")
# Send success response
response = StorageManagementResponse(error=None)
await self.storage_response_producer.send(response)
logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write")
except Exception as e:
logger.error(f"Failed to handle create collection request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="creation_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
raise
async def handle_delete_collection(self, request):
"""
Delete all dimension variants of the index for graph embeddings.
Since indexes are created with dimension suffixes (e.g., t-user-coll-384),
we need to find and delete all matching indexes.
"""
async def delete_collection(self, user: str, collection: str):
"""Delete the collection for graph embeddings via config push"""
try:
prefix = f"t-{request.user}-{request.collection}-"
prefix = f"t-{user}-{collection}-"
# Get all indexes and filter for matches
all_indexes = self.pinecone.list_indexes()
@ -259,16 +183,10 @@ class Processor(GraphEmbeddingsStoreService):
for index_name in matching_indexes:
self.pinecone.delete_index(index_name)
logger.info(f"Deleted Pinecone index: {index_name}")
logger.info(f"Deleted {len(matching_indexes)} index(es) for {request.user}/{request.collection}")
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Deleted {len(matching_indexes)} index(es) for {user}/{collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
raise
def run():

View file

@ -9,11 +9,9 @@ from qdrant_client.models import Distance, VectorParams
import uuid
import logging
from .... base import GraphEmbeddingsStoreService
from .... base import GraphEmbeddingsStoreService, CollectionConfigHandler
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import vector_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -22,7 +20,7 @@ default_ident = "ge-write"
default_store_uri = 'http://localhost:6333'
class Processor(GraphEmbeddingsStoreService):
class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
def __init__(self, **params):
@ -38,44 +36,8 @@ class Processor(GraphEmbeddingsStoreService):
self.qdrant = QdrantClient(url=store_uri, api_key=api_key)
# Set up storage management if base class attributes are available
# (they may not be in unit tests)
if hasattr(self, 'id') and hasattr(self, 'taskgroup') and hasattr(self, 'pulsar_client'):
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=vector_storage_management_topic,
subscriber=f"{self.id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
async def start(self):
"""Start the processor and its storage management consumer"""
await super().start()
if hasattr(self, 'storage_request_consumer'):
await self.storage_request_consumer.start()
if hasattr(self, 'storage_response_producer'):
await self.storage_response_producer.start()
# Register for config push notifications
self.register_config_handler(self.on_collection_config)
async def store_graph_embeddings(self, message):
@ -132,65 +94,22 @@ class Processor(GraphEmbeddingsStoreService):
help=f'Qdrant API key'
)
async def on_storage_management(self, message, consumer, flow):
"""Handle storage management requests"""
request = message.value()
logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}")
try:
if request.operation == "create-collection":
await self.handle_create_collection(request)
elif request.operation == "delete-collection":
await self.handle_delete_collection(request)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {request.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_create_collection(self, request):
async def create_collection(self, user: str, collection: str, metadata: dict):
"""
No-op for collection creation - collections are created lazily on first write
Create collection via config push - collections are created lazily on first write
with the correct dimension determined from the actual embeddings.
"""
try:
logger.info(f"Collection create request for {request.user}/{request.collection} - will be created lazily on first write")
# Send success response
response = StorageManagementResponse(error=None)
await self.storage_response_producer.send(response)
logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write")
except Exception as e:
logger.error(f"Failed to handle create collection request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="creation_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
raise
async def handle_delete_collection(self, request):
"""
Delete all dimension variants of the collection for graph embeddings.
Since collections are created with dimension suffixes (e.g., t_user_coll_384),
we need to find and delete all matching collections.
"""
async def delete_collection(self, user: str, collection: str):
"""Delete the collection for graph embeddings via config push"""
try:
prefix = f"t_{request.user}_{request.collection}_"
prefix = f"t_{user}_{collection}_"
# Get all collections and filter for matches
all_collections = self.qdrant.get_collections().collections
@ -205,16 +124,10 @@ class Processor(GraphEmbeddingsStoreService):
for collection_name in matching_collections:
self.qdrant.delete_collection(collection_name)
logger.info(f"Deleted Qdrant collection: {collection_name}")
logger.info(f"Deleted {len(matching_collections)} collection(s) for {request.user}/{request.collection}")
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Deleted {len(matching_collections)} collection(s) for {user}/{collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
raise
def run():

View file

@ -23,10 +23,11 @@ class Processor(FlowProcessor):
id = params.get("id")
# Use helper to resolve configuration
hosts, username, password = resolve_cassandra_config(
hosts, username, password, keyspace = resolve_cassandra_config(
host=params.get("cassandra_host"),
username=params.get("cassandra_username"),
password=params.get("cassandra_password")
password=params.get("cassandra_password"),
default_keyspace='knowledge'
)
super(Processor, self).__init__(

View file

@ -35,7 +35,7 @@ class Processor(FlowProcessor):
cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password = resolve_cassandra_config(
hosts, username, password, keyspace = resolve_cassandra_config(
host=cassandra_host,
username=cassandra_username,
password=cassandra_password
@ -55,7 +55,7 @@ class Processor(FlowProcessor):
"config_type": self.config_key,
}
)
self.register_specification(
ConsumerSpec(
name = "input",
@ -341,13 +341,6 @@ class Processor(FlowProcessor):
except Exception as e:
logger.warning(f"Failed to convert value {value} to type {field_type}: {e}")
return str(value)
async def start(self):
"""Start the processor and its storage management consumer"""
await super().start()
await self.storage_request_consumer.start()
await self.storage_response_producer.start()
async def on_object(self, msg, consumer, flow):
"""Process incoming ExtractedObject and store in Cassandra"""
@ -368,7 +361,7 @@ class Processor(FlowProcessor):
if result is None or not result.one():
error_msg = (
f"Collection {obj.metadata.collection} does not exist. "
f"Create it first with tg-set-collection."
f"Create it first via collection management API."
)
logger.error(error_msg)
raise ValueError(error_msg)

View file

@ -11,12 +11,10 @@ import time
import logging
from .... direct.cassandra_kg import KnowledgeGraph
from .... base import TriplesStoreService
from .... base import TriplesStoreService, CollectionConfigHandler
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import triples_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -24,10 +22,10 @@ logger = logging.getLogger(__name__)
default_ident = "triples-write"
class Processor(TriplesStoreService):
class Processor(CollectionConfigHandler, TriplesStoreService):
def __init__(self, **params):
id = params.get("id", default_ident)
# Get Cassandra parameters
@ -36,7 +34,7 @@ class Processor(TriplesStoreService):
cassandra_password = params.get("cassandra_password")
# Resolve configuration with environment variable fallback
hosts, username, password = resolve_cassandra_config(
hosts, username, password, keyspace = resolve_cassandra_config(
host=cassandra_host,
username=cassandra_username,
password=cassandra_password
@ -48,39 +46,15 @@ class Processor(TriplesStoreService):
"cassandra_username": username
}
)
self.cassandra_host = hosts
self.cassandra_username = username
self.cassandra_password = password
self.table = None
self.tg = None
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=triples_storage_management_topic,
subscriber=f"{id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
# Register for config push notifications
self.register_config_handler(self.on_collection_config)
async def store_triples(self, message):
@ -109,15 +83,6 @@ class Processor(TriplesStoreService):
self.table = user
# Validate collection exists before accepting writes
if not self.tg.collection_exists(message.metadata.collection):
error_msg = (
f"Collection {message.metadata.collection} does not exist. "
f"Create it first with tg-set-collection."
)
logger.error(error_msg)
raise ValueError(error_msg)
for t in message.triples:
self.tg.insert(
message.metadata.collection,
@ -126,133 +91,77 @@ class Processor(TriplesStoreService):
t.o.value
)
async def start(self):
"""Start the processor and its storage management consumer"""
await super().start()
await self.storage_request_consumer.start()
await self.storage_response_producer.start()
async def on_storage_management(self, message, consumer, flow):
"""Handle storage management requests"""
request = message.value()
logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}")
try:
if request.operation == "create-collection":
await self.handle_create_collection(request)
elif request.operation == "delete-collection":
await self.handle_delete_collection(request)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {request.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_create_collection(self, request):
"""Create a collection in Cassandra triple store"""
async def create_collection(self, user: str, collection: str, metadata: dict):
"""Create a collection in Cassandra triple store via config push"""
try:
# Create or reuse connection for this user's keyspace
if self.table is None or self.table != request.user:
if self.table is None or self.table != user:
self.tg = None
try:
if self.cassandra_username and self.cassandra_password:
self.tg = KnowledgeGraph(
hosts=self.cassandra_host,
keyspace=request.user,
keyspace=user,
username=self.cassandra_username,
password=self.cassandra_password
)
else:
self.tg = KnowledgeGraph(
hosts=self.cassandra_host,
keyspace=request.user,
keyspace=user,
)
except Exception as e:
logger.error(f"Failed to connect to Cassandra for user {request.user}: {e}")
logger.error(f"Failed to connect to Cassandra for user {user}: {e}")
raise
self.table = request.user
self.table = user
# Create collection using the built-in method
logger.info(f"Creating collection {request.collection} for user {request.user}")
logger.info(f"Creating collection {collection} for user {user}")
if self.tg.collection_exists(request.collection):
logger.info(f"Collection {request.collection} already exists")
if self.tg.collection_exists(collection):
logger.info(f"Collection {collection} already exists")
else:
self.tg.create_collection(request.collection)
logger.info(f"Created collection {request.collection}")
# Send success response
response = StorageManagementResponse(error=None)
await self.storage_response_producer.send(response)
self.tg.create_collection(collection)
logger.info(f"Created collection {collection}")
except Exception as e:
logger.error(f"Failed to create collection: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="creation_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
raise
async def handle_delete_collection(self, request):
async def delete_collection(self, user: str, collection: str):
"""Delete all data for a specific collection from the unified triples table"""
try:
# Create or reuse connection for this user's keyspace
if self.table is None or self.table != request.user:
if self.table is None or self.table != user:
self.tg = None
try:
if self.cassandra_username and self.cassandra_password:
self.tg = KnowledgeGraph(
hosts=self.cassandra_host,
keyspace=request.user,
keyspace=user,
username=self.cassandra_username,
password=self.cassandra_password
)
else:
self.tg = KnowledgeGraph(
hosts=self.cassandra_host,
keyspace=request.user,
keyspace=user,
)
except Exception as e:
logger.error(f"Failed to connect to Cassandra for user {request.user}: {e}")
logger.error(f"Failed to connect to Cassandra for user {user}: {e}")
raise
self.table = request.user
self.table = user
# Delete all triples for this collection using the built-in method
try:
self.tg.delete_collection(request.collection)
logger.info(f"Deleted all triples for collection {request.collection} from keyspace {request.user}")
except Exception as e:
logger.error(f"Failed to delete collection data: {e}")
raise
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {request.user}/{request.collection}")
self.tg.delete_collection(collection)
logger.info(f"Deleted all triples for collection {collection} from keyspace {user}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
raise
@staticmethod

View file

@ -12,11 +12,9 @@ import logging
from falkordb import FalkorDB
from .... base import TriplesStoreService
from .... base import TriplesStoreService, CollectionConfigHandler
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import triples_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -26,10 +24,10 @@ default_ident = "triples-write"
default_graph_url = 'falkor://falkordb:6379'
default_database = 'falkordb'
class Processor(TriplesStoreService):
class Processor(CollectionConfigHandler, TriplesStoreService):
def __init__(self, **params):
graph_url = params.get("graph_url", default_graph_url)
database = params.get("database", default_database)
@ -44,33 +42,8 @@ class Processor(TriplesStoreService):
self.io = FalkorDB.from_url(graph_url).select_graph(database)
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=triples_storage_management_topic,
subscriber=f"{self.id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
# Register for config push notifications
self.register_config_handler(self.on_collection_config)
def create_node(self, uri, user, collection):
@ -184,7 +157,7 @@ class Processor(TriplesStoreService):
if not self.collection_exists(user, collection):
error_msg = (
f"Collection {collection} does not exist. "
f"Create it first with tg-set-collection."
f"Create it first via collection management API."
)
logger.error(error_msg)
raise ValueError(error_msg)
@ -217,95 +190,58 @@ class Processor(TriplesStoreService):
help=f'FalkorDB database (default: {default_database})'
)
async def start(self):
"""Start the processor and its storage management consumer"""
await super().start()
await self.storage_request_consumer.start()
await self.storage_response_producer.start()
async def on_storage_management(self, message, consumer, flow):
"""Handle storage management requests"""
request = message.value()
logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}")
async def create_collection(self, user: str, collection: str, metadata: dict):
"""Create collection metadata in FalkorDB via config push"""
try:
if request.operation == "create-collection":
await self.handle_create_collection(request)
elif request.operation == "delete-collection":
await self.handle_delete_collection(request)
# Check if collection exists
result = self.io.query(
"MATCH (c:CollectionMetadata {user: $user, collection: $collection}) RETURN c LIMIT 1",
params={"user": user, "collection": collection}
)
if result.result_set:
logger.info(f"Collection {user}/{collection} already exists")
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {request.operation}"
)
# Create collection metadata node
import datetime
self.io.query(
"MERGE (c:CollectionMetadata {user: $user, collection: $collection}) "
"SET c.created_at = $created_at",
params={
"user": user,
"collection": collection,
"created_at": datetime.datetime.now().isoformat()
}
)
await self.storage_response_producer.send(response)
logger.info(f"Created collection {user}/{collection}")
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
raise
async def handle_create_collection(self, request):
"""Create collection metadata in FalkorDB"""
try:
if self.collection_exists(request.user, request.collection):
logger.info(f"Collection {request.user}/{request.collection} already exists")
else:
self.create_collection(request.user, request.collection)
logger.info(f"Created collection {request.user}/{request.collection}")
# Send success response
response = StorageManagementResponse(error=None)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Failed to create collection: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="creation_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_delete_collection(self, request):
"""Delete the collection for FalkorDB triples"""
async def delete_collection(self, user: str, collection: str):
"""Delete the collection for FalkorDB triples via config push"""
try:
# Delete all nodes and literals for this user/collection
node_result = self.io.query(
"MATCH (n:Node {user: $user, collection: $collection}) DETACH DELETE n",
params={"user": request.user, "collection": request.collection}
params={"user": user, "collection": collection}
)
literal_result = self.io.query(
"MATCH (n:Literal {user: $user, collection: $collection}) DETACH DELETE n",
params={"user": request.user, "collection": request.collection}
params={"user": user, "collection": collection}
)
# Delete collection metadata node
metadata_result = self.io.query(
"MATCH (c:CollectionMetadata {user: $user, collection: $collection}) DELETE c",
params={"user": request.user, "collection": request.collection}
params={"user": user, "collection": collection}
)
logger.info(f"Deleted {node_result.nodes_deleted} nodes, {literal_result.nodes_deleted} literals, and {metadata_result.nodes_deleted} metadata nodes for collection {request.user}/{request.collection}")
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {request.user}/{request.collection}")
logger.info(f"Deleted {node_result.nodes_deleted} nodes, {literal_result.nodes_deleted} literals, and {metadata_result.nodes_deleted} metadata nodes for collection {user}/{collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
raise
def run():

View file

@ -12,11 +12,9 @@ import logging
from neo4j import GraphDatabase
from .... base import TriplesStoreService
from .... base import TriplesStoreService, CollectionConfigHandler
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import triples_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -28,10 +26,10 @@ default_username = 'memgraph'
default_password = 'password'
default_database = 'memgraph'
class Processor(TriplesStoreService):
class Processor(CollectionConfigHandler, TriplesStoreService):
def __init__(self, **params):
graph_host = params.get("graph_host", default_graph_host)
username = params.get("username", default_username)
password = params.get("password", default_password)
@ -53,33 +51,8 @@ class Processor(TriplesStoreService):
with self.io.session(database=self.db) as session:
self.create_indexes(session)
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=triples_storage_management_topic,
subscriber=f"{self.id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
# Register for config push notifications
self.register_config_handler(self.on_collection_config)
def create_indexes(self, session):
@ -267,28 +240,6 @@ class Processor(TriplesStoreService):
src=t.s.value, dest=t.o.value, uri=t.p.value, user=user, collection=collection,
)
def collection_exists(self, user, collection):
"""Check if collection metadata node exists"""
with self.io.session(database=self.db) as session:
result = session.run(
"MATCH (c:CollectionMetadata {user: $user, collection: $collection}) "
"RETURN c LIMIT 1",
user=user, collection=collection
)
return bool(list(result))
def create_collection(self, user, collection):
"""Create collection metadata node"""
import datetime
with self.io.session(database=self.db) as session:
session.run(
"MERGE (c:CollectionMetadata {user: $user, collection: $collection}) "
"SET c.created_at = $created_at",
user=user, collection=collection,
created_at=datetime.datetime.now().isoformat()
)
logger.info(f"Created collection metadata node for {user}/{collection}")
async def store_triples(self, message):
# Extract user and collection from metadata
@ -299,7 +250,7 @@ class Processor(TriplesStoreService):
if not self.collection_exists(user, collection):
error_msg = (
f"Collection {collection} does not exist. "
f"Create it first with tg-set-collection."
f"Create it first via collection management API."
)
logger.error(error_msg)
raise ValueError(error_msg)
@ -348,73 +299,50 @@ class Processor(TriplesStoreService):
help=f'Memgraph database (default: {default_database})'
)
async def start(self):
"""Start the processor and its storage management consumer"""
await super().start()
await self.storage_request_consumer.start()
await self.storage_response_producer.start()
def _collection_exists_in_db(self, user, collection):
"""Check if collection metadata node exists"""
with self.io.session(database=self.db) as session:
result = session.run(
"MATCH (c:CollectionMetadata {user: $user, collection: $collection}) "
"RETURN c LIMIT 1",
user=user, collection=collection
)
return bool(list(result))
async def on_storage_management(self, message, consumer, flow):
"""Handle storage management requests"""
request = message.value()
logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}")
def _create_collection_in_db(self, user, collection):
"""Create collection metadata node"""
import datetime
with self.io.session(database=self.db) as session:
session.run(
"MERGE (c:CollectionMetadata {user: $user, collection: $collection}) "
"SET c.created_at = $created_at",
user=user, collection=collection,
created_at=datetime.datetime.now().isoformat()
)
logger.info(f"Created collection metadata node for {user}/{collection}")
async def create_collection(self, user: str, collection: str, metadata: dict):
"""Create collection metadata in Memgraph via config push"""
try:
if request.operation == "create-collection":
await self.handle_create_collection(request)
elif request.operation == "delete-collection":
await self.handle_delete_collection(request)
if self._collection_exists_in_db(user, collection):
logger.info(f"Collection {user}/{collection} already exists")
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {request.operation}"
)
)
await self.storage_response_producer.send(response)
self._create_collection_in_db(user, collection)
logger.info(f"Created collection {user}/{collection}")
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
raise
async def handle_create_collection(self, request):
"""Create collection metadata in Memgraph"""
try:
if self.collection_exists(request.user, request.collection):
logger.info(f"Collection {request.user}/{request.collection} already exists")
else:
self.create_collection(request.user, request.collection)
logger.info(f"Created collection {request.user}/{request.collection}")
# Send success response
response = StorageManagementResponse(error=None)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Failed to create collection: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="creation_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def handle_delete_collection(self, request):
"""Delete all data for a specific collection"""
async def delete_collection(self, user: str, collection: str):
"""Delete all data for a specific collection via config push"""
try:
with self.io.session(database=self.db) as session:
# Delete all nodes for this user and collection
node_result = session.run(
"MATCH (n:Node {user: $user, collection: $collection}) "
"DETACH DELETE n",
user=request.user, collection=request.collection
user=user, collection=collection
)
nodes_deleted = node_result.consume().counters.nodes_deleted
@ -422,7 +350,7 @@ class Processor(TriplesStoreService):
literal_result = session.run(
"MATCH (n:Literal {user: $user, collection: $collection}) "
"DETACH DELETE n",
user=request.user, collection=request.collection
user=user, collection=collection
)
literals_deleted = literal_result.consume().counters.nodes_deleted
@ -430,20 +358,13 @@ class Processor(TriplesStoreService):
metadata_result = session.run(
"MATCH (c:CollectionMetadata {user: $user, collection: $collection}) "
"DELETE c",
user=request.user, collection=request.collection
user=user, collection=collection
)
metadata_deleted = metadata_result.consume().counters.nodes_deleted
# Note: Relationships are automatically deleted with DETACH DELETE
logger.info(f"Deleted {nodes_deleted} nodes, {literals_deleted} literals, and {metadata_deleted} metadata nodes for {request.user}/{request.collection}")
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {request.user}/{request.collection}")
logger.info(f"Deleted {nodes_deleted} nodes, {literals_deleted} literals, and {metadata_deleted} metadata nodes for {user}/{collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")

View file

@ -11,11 +11,9 @@ import time
import logging
from neo4j import GraphDatabase
from .... base import TriplesStoreService
from .... base import TriplesStoreService, CollectionConfigHandler
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... schema import StorageManagementRequest, StorageManagementResponse, Error
from .... schema import triples_storage_management_topic, storage_management_response_topic
# Module logger
logger = logging.getLogger(__name__)
@ -27,10 +25,10 @@ default_username = 'neo4j'
default_password = 'password'
default_database = 'neo4j'
class Processor(TriplesStoreService):
class Processor(CollectionConfigHandler, TriplesStoreService):
def __init__(self, **params):
id = params.get("id", default_ident)
graph_host = params.get("graph_host", default_graph_host)
@ -53,33 +51,8 @@ class Processor(TriplesStoreService):
with self.io.session(database=self.db) as session:
self.create_indexes(session)
# Set up metrics for storage management
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Set up consumer for storage management requests
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
client=self.pulsar_client,
flow=None,
topic=triples_storage_management_topic,
subscriber=f"{id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Set up producer for storage management responses
self.storage_response_producer = Producer(
client=self.pulsar_client,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
# Register for config push notifications
self.register_config_handler(self.on_collection_config)
def create_indexes(self, session):
@ -232,7 +205,7 @@ class Processor(TriplesStoreService):
if not self.collection_exists(user, collection):
error_msg = (
f"Collection {collection} does not exist. "
f"Create it first with tg-set-collection."
f"Create it first via collection management API."
)
logger.error(error_msg)
raise ValueError(error_msg)
@ -277,42 +250,7 @@ class Processor(TriplesStoreService):
help=f'Neo4j database (default: {default_database})'
)
async def start(self):
"""Start the processor and its storage management consumer"""
await super().start()
await self.storage_request_consumer.start()
await self.storage_response_producer.start()
async def on_storage_management(self, message, consumer, flow):
"""Handle storage management requests"""
request = message.value()
logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}")
try:
if request.operation == "create-collection":
await self.handle_create_collection(request)
elif request.operation == "delete-collection":
await self.handle_delete_collection(request)
else:
response = StorageManagementResponse(
error=Error(
type="invalid_operation",
message=f"Unknown operation: {request.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error processing storage management request: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
def collection_exists(self, user, collection):
def _collection_exists_in_db(self, user, collection):
"""Check if collection metadata node exists"""
with self.io.session(database=self.db) as session:
result = session.run(
@ -322,7 +260,7 @@ class Processor(TriplesStoreService):
)
return bool(list(result))
def create_collection(self, user, collection):
def _create_collection_in_db(self, user, collection):
"""Create collection metadata node"""
import datetime
with self.io.session(database=self.db) as session:
@ -334,38 +272,28 @@ class Processor(TriplesStoreService):
)
logger.info(f"Created collection metadata node for {user}/{collection}")
async def handle_create_collection(self, request):
"""Create collection metadata in Neo4j"""
async def create_collection(self, user: str, collection: str, metadata: dict):
"""Create collection metadata in Neo4j via config push"""
try:
if self.collection_exists(request.user, request.collection):
logger.info(f"Collection {request.user}/{request.collection} already exists")
if self._collection_exists_in_db(user, collection):
logger.info(f"Collection {user}/{collection} already exists")
else:
self.create_collection(request.user, request.collection)
logger.info(f"Created collection {request.user}/{request.collection}")
# Send success response
response = StorageManagementResponse(error=None)
await self.storage_response_producer.send(response)
self._create_collection_in_db(user, collection)
logger.info(f"Created collection {user}/{collection}")
except Exception as e:
logger.error(f"Failed to create collection: {e}", exc_info=True)
response = StorageManagementResponse(
error=Error(
type="creation_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
raise
async def handle_delete_collection(self, request):
"""Delete all data for a specific collection"""
async def delete_collection(self, user: str, collection: str):
"""Delete all data for a specific collection via config push"""
try:
with self.io.session(database=self.db) as session:
# Delete all nodes for this user and collection
node_result = session.run(
"MATCH (n:Node {user: $user, collection: $collection}) "
"DETACH DELETE n",
user=request.user, collection=request.collection
user=user, collection=collection
)
nodes_deleted = node_result.consume().counters.nodes_deleted
@ -373,7 +301,7 @@ class Processor(TriplesStoreService):
literal_result = session.run(
"MATCH (n:Literal {user: $user, collection: $collection}) "
"DETACH DELETE n",
user=request.user, collection=request.collection
user=user, collection=collection
)
literals_deleted = literal_result.consume().counters.nodes_deleted
@ -383,21 +311,14 @@ class Processor(TriplesStoreService):
metadata_result = session.run(
"MATCH (c:CollectionMetadata {user: $user, collection: $collection}) "
"DELETE c",
user=request.user, collection=request.collection
user=user, collection=collection
)
metadata_deleted = metadata_result.consume().counters.nodes_deleted
logger.info(f"Deleted {nodes_deleted} nodes, {literals_deleted} literals, and {metadata_deleted} metadata nodes for {request.user}/{request.collection}")
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {request.user}/{request.collection}")
logger.info(f"Deleted {nodes_deleted} nodes, {literals_deleted} literals, and {metadata_deleted} metadata nodes for {user}/{collection}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
raise
def run():

View file

@ -111,21 +111,6 @@ class LibraryTableStore:
);
""");
logger.debug("collections table...")
self.cassandra.execute("""
CREATE TABLE IF NOT EXISTS collections (
user text,
collection text,
name text,
description text,
tags set<text>,
created_at timestamp,
updated_at timestamp,
PRIMARY KEY (user, collection)
);
""");
logger.info("Cassandra schema OK.")
def prepare_statements(self):
@ -202,43 +187,6 @@ class LibraryTableStore:
LIMIT 1
""")
# Collection management statements
self.insert_collection_stmt = self.cassandra.prepare("""
INSERT INTO collections
(user, collection, name, description, tags, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""")
self.update_collection_stmt = self.cassandra.prepare("""
UPDATE collections
SET name = ?, description = ?, tags = ?, updated_at = ?
WHERE user = ? AND collection = ?
""")
self.get_collection_stmt = self.cassandra.prepare("""
SELECT collection, name, description, tags, created_at, updated_at
FROM collections
WHERE user = ? AND collection = ?
""")
self.list_collections_stmt = self.cassandra.prepare("""
SELECT collection, name, description, tags, created_at, updated_at
FROM collections
WHERE user = ?
""")
self.delete_collection_stmt = self.cassandra.prepare("""
DELETE FROM collections
WHERE user = ? AND collection = ?
""")
self.collection_exists_stmt = self.cassandra.prepare("""
SELECT collection
FROM collections
WHERE user = ? AND collection = ?
LIMIT 1
""")
self.list_processing_stmt = self.cassandra.prepare("""
SELECT
id, document_id, time, flow, collection, tags
@ -572,146 +520,3 @@ class LibraryTableStore:
logger.debug("Done")
return lst
# Collection management methods
async def ensure_collection_exists(self, user, collection):
"""Ensure collection metadata record exists, create if not"""
try:
resp = await asyncio.get_event_loop().run_in_executor(
None, self.cassandra.execute, self.collection_exists_stmt, [user, collection]
)
if resp:
return
import datetime
now = datetime.datetime.now()
await asyncio.get_event_loop().run_in_executor(
None, self.cassandra.execute, self.insert_collection_stmt,
[user, collection, collection, "", set(), now, now]
)
logger.debug(f"Created collection metadata for {user}/{collection}")
except Exception as e:
logger.error(f"Error ensuring collection exists: {e}")
raise
async def list_collections(self, user, tag_filter=None):
"""List collections for a user, optionally filtered by tags"""
try:
resp = await asyncio.get_event_loop().run_in_executor(
None, self.cassandra.execute, self.list_collections_stmt, [user]
)
collections = []
for row in resp:
collection_data = {
"user": user,
"collection": row[0],
"name": row[1] or row[0],
"description": row[2] or "",
"tags": list(row[3]) if row[3] else [],
"created_at": row[4].isoformat() if row[4] else "",
"updated_at": row[5].isoformat() if row[5] else ""
}
if tag_filter:
collection_tags = set(collection_data["tags"])
filter_tags = set(tag_filter)
if not filter_tags.intersection(collection_tags):
continue
collections.append(collection_data)
return collections
except Exception as e:
logger.error(f"Error listing collections: {e}")
raise
async def update_collection(self, user, collection, name=None, description=None, tags=None):
"""Update collection metadata"""
try:
resp = await asyncio.get_event_loop().run_in_executor(
None, self.cassandra.execute, self.get_collection_stmt, [user, collection]
)
if not resp:
raise RequestError(f"Collection {collection} not found")
row = resp.one()
current_name = row[1] or collection
current_description = row[2] or ""
current_tags = set(row[3]) if row[3] else set()
new_name = name if name is not None else current_name
new_description = description if description is not None else current_description
new_tags = set(tags) if tags is not None else current_tags
import datetime
now = datetime.datetime.now()
await asyncio.get_event_loop().run_in_executor(
None, self.cassandra.execute, self.update_collection_stmt,
[new_name, new_description, new_tags, now, user, collection]
)
return {
"user": user, "collection": collection, "name": new_name,
"description": new_description, "tags": list(new_tags),
"updated_at": now.isoformat()
}
except Exception as e:
logger.error(f"Error updating collection: {e}")
raise
async def delete_collection(self, user, collection):
"""Delete collection metadata record"""
try:
await asyncio.get_event_loop().run_in_executor(
None, self.cassandra.execute, self.delete_collection_stmt, [user, collection]
)
logger.debug(f"Deleted collection metadata for {user}/{collection}")
except Exception as e:
logger.error(f"Error deleting collection metadata: {e}")
raise
async def get_collection(self, user, collection):
"""Get collection metadata"""
try:
resp = await asyncio.get_event_loop().run_in_executor(
None, self.cassandra.execute, self.get_collection_stmt, [user, collection]
)
if not resp:
return None
row = resp.one()
return {
"user": user, "collection": row[0], "name": row[1] or row[0],
"description": row[2] or "", "tags": list(row[3]) if row[3] else [],
"created_at": row[4].isoformat() if row[4] else "",
"updated_at": row[5].isoformat() if row[5] else ""
}
except Exception as e:
logger.error(f"Error getting collection: {e}")
raise
async def create_collection(self, user, collection, name=None, description=None, tags=None):
"""Create a new collection metadata record"""
try:
import datetime
now = datetime.datetime.now()
# Set defaults for optional parameters
name = name if name is not None else collection
description = description if description is not None else ""
tags = tags if tags is not None else set()
await asyncio.get_event_loop().run_in_executor(
None, self.cassandra.execute, self.insert_collection_stmt,
[user, collection, name, description, tags, now, now]
)
logger.info(f"Created collection {user}/{collection}")
# Return the created collection data
return {
"user": user,
"collection": collection,
"name": name,
"description": description,
"tags": list(tags) if isinstance(tags, set) else tags,
"created_at": now.isoformat(),
"updated_at": now.isoformat()
}
except Exception as e:
logger.error(f"Error creating collection: {e}")
raise