Address legacy issues in storage management (#595)

* Removed legacy storage management cruft.  Tidied tech specs.

* Fix deletion of last collection

* Storage processor ignores data on the queue which is for a deleted collection

* Updated tests
This commit is contained in:
cybermaggedon 2026-01-05 13:45:14 +00:00 committed by GitHub
parent 25563bae3c
commit ae13190093
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 188 additions and 264 deletions

View file

@ -233,9 +233,13 @@ When a user initiates collection deletion through the librarian service:
#### Collection Management Interface
All store writers implement a standardized collection management interface with a common schema:
**⚠️ LEGACY APPROACH - REPLACED BY CONFIG-BASED PATTERN**
**Message Schema (`StorageManagementRequest`):**
The queue-based architecture described below has been replaced with a config-based approach using `CollectionConfigHandler`. All storage backends now receive collection updates via config push messages instead of dedicated management queues.
~~All store writers implement a standardized collection management interface with a common schema:~~
~~**Message Schema (`StorageManagementRequest`):**~~
```json
{
"operation": "create-collection" | "delete-collection",
@ -244,24 +248,26 @@ All store writers implement a standardized collection management interface with
}
```
**Queue Architecture:**
- **Vector Store Management Queue** (`vector-storage-management`): Vector/embedding stores
- **Object Store Management Queue** (`object-storage-management`): Object/document stores
- **Triple Store Management Queue** (`triples-storage-management`): Graph/RDF stores
- **Storage Response Queue** (`storage-management-response`): All responses sent here
~~**Queue Architecture:**~~
- ~~**Vector Store Management Queue** (`vector-storage-management`): Vector/embedding stores~~
- ~~**Object Store Management Queue** (`object-storage-management`): Object/document stores~~
- ~~**Triple Store Management Queue** (`triples-storage-management`): Graph/RDF stores~~
- ~~**Storage Response Queue** (`storage-management-response`): All responses sent here~~
Each store writer implements:
- **Collection Management Handler**: Processes `StorageManagementRequest` messages
- **Create Collection Operation**: Establishes collection in storage backend
- **Delete Collection Operation**: Removes all data associated with collection
- **Collection State Tracking**: Maintains knowledge of which collections exist
- **Message Processing**: Consumes from dedicated management queue
- **Status Reporting**: Returns success/failure via `StorageManagementResponse`
- **Idempotent Operations**: Safe to call create/delete multiple times
**Current Implementation:**
**Supported Operations:**
- `create-collection`: Create collection in storage backend
- `delete-collection`: Remove all collection data from storage backend
All storage backends now use `CollectionConfigHandler`:
- **Config Push Integration**: Storage services register for config push notifications
- **Automatic Synchronization**: Collections created/deleted based on config changes
- **Declarative Model**: Collections defined in config service, backends sync to match
- **No Request/Response**: Eliminates coordination overhead and response tracking
- **Collection State Tracking**: Maintained via `known_collections` cache
- **Idempotent Operations**: Safe to process same config multiple times
Each storage backend implements:
- `create_collection(user: str, collection: str, metadata: dict)` - Create collection structures
- `delete_collection(user: str, collection: str)` - Remove all collection data
- `collection_exists(user: str, collection: str) -> bool` - Validate before writes
#### Cassandra Triple Store Refactor
@ -365,62 +371,33 @@ Comprehensive testing will cover:
- `triples_collection` table for SPO queries and deletion tracking
- Collection deletion implemented with read-then-delete pattern
### 🔄 In Progress Components
### ✅ Migration to Config-Based Pattern - COMPLETED
1. **Collection Creation Broadcast** (`trustgraph-flow/trustgraph/librarian/collection_manager.py`)
- Update `update_collection()` to send "create-collection" to storage backends
- Wait for confirmations from all storage processors
- Handle creation failures appropriately
**All storage backends have been migrated from the queue-based pattern to the config-based `CollectionConfigHandler` pattern.**
2. **Document Submission Handler** (`trustgraph-flow/trustgraph/librarian/service.py` or similar)
- Check if collection exists when document submitted
- If not exists: Create collection with defaults before processing document
- Trigger same "create-collection" broadcast as `tg-set-collection`
- Ensure collection established before document flows to storage processors
Completed migrations:
- ✅ `trustgraph-flow/trustgraph/storage/triples/cassandra/write.py`
- ✅ `trustgraph-flow/trustgraph/storage/triples/neo4j/write.py`
- ✅ `trustgraph-flow/trustgraph/storage/triples/memgraph/write.py`
- ✅ `trustgraph-flow/trustgraph/storage/triples/falkordb/write.py`
- ✅ `trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py`
- ✅ `trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py`
- ✅ `trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py`
- ✅ `trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py`
- ✅ `trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py`
- ✅ `trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py`
- ✅ `trustgraph-flow/trustgraph/storage/objects/cassandra/write.py`
### ❌ Pending Components
All backends now:
- Inherit from `CollectionConfigHandler`
- Register for config push notifications via `self.register_config_handler(self.on_collection_config)`
- Implement `create_collection(user, collection, metadata)` and `delete_collection(user, collection)`
- Use `collection_exists(user, collection)` to validate before writes
- Automatically sync with config service changes
1. **Collection State Tracking** - Need to implement in each storage backend:
- **Cassandra Triples**: Use `triples_collection` table with marker triples
- **Neo4j/Memgraph/FalkorDB**: Create `:CollectionMetadata` nodes
- **Qdrant/Milvus/Pinecone**: Use native collection APIs
- **Cassandra Objects**: Add collection metadata tracking
2. **Storage Management Handlers** - Need "create-collection" support in 12 files:
- `trustgraph-flow/trustgraph/storage/triples/cassandra/write.py`
- `trustgraph-flow/trustgraph/storage/triples/neo4j/write.py`
- `trustgraph-flow/trustgraph/storage/triples/memgraph/write.py`
- `trustgraph-flow/trustgraph/storage/triples/falkordb/write.py`
- `trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py`
- `trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py`
- `trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py`
- `trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py`
- `trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py`
- `trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py`
- `trustgraph-flow/trustgraph/storage/objects/cassandra/write.py`
- Plus any other storage implementations
3. **Write Operation Validation** - Add collection existence checks to all `store_*` methods
4. **Query Operation Handling** - Update queries to return empty for non-existent collections
### Next Implementation Steps
**Phase 1: Core Infrastructure (2-3 days)**
1. Add collection state tracking methods to all storage backends
2. Implement `collection_exists()` and `create_collection()` methods
**Phase 2: Storage Handlers (1 week)**
3. Add "create-collection" handlers to all storage processors
4. Add write validation to reject non-existent collections
5. Update query handling for non-existent collections
**Phase 3: Collection Manager (2-3 days)**
6. Update collection_manager to broadcast creates
7. Implement response tracking and error handling
**Phase 4: Testing (3-5 days)**
8. End-to-end testing of explicit creation workflow
9. Test all storage backends
10. Validate error handling and edge cases
Legacy queue-based infrastructure removed:
- ✅ Removed `StorageManagementRequest` and `StorageManagementResponse` schemas
- ✅ Removed storage management queue topic definitions
- ✅ Removed storage management consumer/producer from all backends
- ✅ Removed `on_storage_management` handlers from all backends

View file

@ -62,19 +62,20 @@ When tenant A starts flow `tenant-a-prod` and tenant B starts flow `tenant-b-pro
- **Impact:** Config, cores, and librarian services
- **Blocks:** Multiple tenants cannot use separate Cassandra keyspaces
### Issue #4: Collection Management Architecture
- **Current:** Collections stored in Cassandra librarian keyspace via separate collections table
- **Current:** Librarian uses 4 hardcoded storage management topics to coordinate collection create/delete:
### Issue #4: Collection Management Architecture ✅ COMPLETED
- **Previous:** Collections stored in Cassandra librarian keyspace via separate collections table
- **Previous:** Librarian used 4 hardcoded storage management topics to coordinate collection create/delete:
- `vector_storage_management_topic`
- `object_storage_management_topic`
- `triples_storage_management_topic`
- `storage_management_response_topic`
- **Problems:**
- Hardcoded topics cannot be customized for multi-tenant deployments
- **Problems (Resolved):**
- Hardcoded topics could not be customized for multi-tenant deployments
- Complex async coordination between librarian and 4+ storage services
- Separate Cassandra table and management infrastructure
- Non-persistent request/response queues for critical operations
- **Solution:** Migrate collections to config service storage, use config push for distribution
- **Solution Implemented:** Migrated collections to config service storage, use config push for distribution
- **Status:** All storage backends migrated to `CollectionConfigHandler` pattern
## Solution
@ -448,7 +449,9 @@ async def delete_collection(self, user, collection):
- Breaking change acceptable - no data migration needed
- Simplifies librarian service significantly
#### Change 10: Storage Services - Config-Based Collection Management
#### Change 10: Storage Services - Config-Based Collection Management ✅ COMPLETED
**Status:** All 11 storage backends have been migrated to use `CollectionConfigHandler`.
**Affected Services (11 total):**
- Document embeddings: milvus, pinecone, qdrant
@ -708,9 +711,9 @@ All services inheriting from AsyncProcessor or FlowProcessor:
- tables/library.py (collections table removal)
- schema/services/collection.py (timestamp removal)
**Deferred Changes (Change 10):**
- All storage services (11 total) - will subscribe to config push for collection updates
- Storage management schema (potentially removable if unused elsewhere)
**Completed Changes (Change 10):** ✅
- All storage services (11 total) - migrated to config push for collection updates via `CollectionConfigHandler`
- Storage management schema removed from `storage.py`
## Future Considerations
@ -749,10 +752,11 @@ Some services use **per-user keyspaces** dynamically, where each user gets their
- Update collection schema (remove timestamps)
- **Outcome:** Eliminates hardcoded storage management topics, simplifies librarian
### Phase 3: Storage Service Updates (Change 10) - Deferred
- Update all storage services to use config push for collections
- Remove storage management request/response infrastructure
- **Outcome:** Complete config-based collection management
### Phase 3: Storage Service Updates (Change 10) ✅ COMPLETED
- Updated all storage services to use config push for collections via `CollectionConfigHandler`
- Removed storage management request/response infrastructure
- Removed legacy schema definitions
- **Outcome:** Complete config-based collection management achieved
## References
- GitHub Issue: https://github.com/trustgraph-ai/trustgraph/issues/582

View file

@ -117,7 +117,7 @@ class TestObjectsCassandraIntegration:
assert "customer_records" in processor.schemas
# Step 1.5: Create the collection first (simulate tg-set-collection)
await processor.create_collection("test_user", "import_2024")
await processor.create_collection("test_user", "import_2024", {})
# Step 2: Process an ExtractedObject
test_obj = ExtractedObject(
@ -213,8 +213,8 @@ class TestObjectsCassandraIntegration:
assert len(processor.schemas) == 2
# Create collections first
await processor.create_collection("shop", "catalog")
await processor.create_collection("shop", "sales")
await processor.create_collection("shop", "catalog", {})
await processor.create_collection("shop", "sales", {})
# Process objects for different schemas
product_obj = ExtractedObject(
@ -263,7 +263,7 @@ class TestObjectsCassandraIntegration:
)
# Create collection first
await processor.create_collection("test", "test")
await processor.create_collection("test", "test", {})
# Create object missing required field
test_obj = ExtractedObject(
@ -302,7 +302,7 @@ class TestObjectsCassandraIntegration:
)
# Create collection first
await processor.create_collection("logger", "app_events")
await processor.create_collection("logger", "app_events", {})
# Process object
test_obj = ExtractedObject(
@ -407,7 +407,7 @@ class TestObjectsCassandraIntegration:
# Create all collections first
for coll in collections:
await processor.create_collection("analytics", coll)
await processor.create_collection("analytics", coll, {})
for coll in collections:
obj = ExtractedObject(
@ -486,7 +486,7 @@ class TestObjectsCassandraIntegration:
)
# Create collection first
await processor.create_collection("test_user", "batch_import")
await processor.create_collection("test_user", "batch_import", {})
msg = MagicMock()
msg.value.return_value = batch_obj
@ -532,7 +532,7 @@ class TestObjectsCassandraIntegration:
)
# Create collection first
await processor.create_collection("test", "empty")
await processor.create_collection("test", "empty", {})
# Process empty batch object
empty_obj = ExtractedObject(
@ -573,7 +573,7 @@ class TestObjectsCassandraIntegration:
)
# Create collection first
await processor.create_collection("test", "mixed")
await processor.create_collection("test", "mixed", {})
# Single object (backward compatibility)
single_obj = ExtractedObject(

View file

@ -78,7 +78,10 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase):
}
processor = Processor(**config)
# Add collection to known_collections (simulates config push)
processor.known_collections[('test_user', 'test_collection')] = {}
# Create mock message with chunks and vectors
mock_message = MagicMock()
mock_message.metadata.user = 'test_user'
@ -129,7 +132,10 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase):
}
processor = Processor(**config)
# Add collection to known_collections (simulates config push)
processor.known_collections[('multi_user', 'multi_collection')] = {}
# Create mock message with multiple chunks
mock_message = MagicMock()
mock_message.metadata.user = 'multi_user'
@ -186,7 +192,10 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase):
}
processor = Processor(**config)
# Add collection to known_collections (simulates config push)
processor.known_collections[('vector_user', 'vector_collection')] = {}
# Create mock message with chunk having multiple vectors
mock_message = MagicMock()
mock_message.metadata.user = 'vector_user'
@ -280,6 +289,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase):
processor = Processor(**config)
# Add collection to known_collections (simulates config push)
processor.known_collections[('new_user', 'new_collection')] = {}
# Create mock message
mock_message = MagicMock()
mock_message.metadata.user = 'new_user'
@ -329,6 +341,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase):
processor = Processor(**config)
# Add collection to known_collections (simulates config push)
processor.known_collections[('error_user', 'error_collection')] = {}
# Create mock message
mock_message = MagicMock()
mock_message.metadata.user = 'error_user'
@ -364,6 +379,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase):
processor = Processor(**config)
# Add collection to known_collections (simulates config push)
processor.known_collections[('cache_user', 'cache_collection')] = {}
# Create first mock message
mock_message1 = MagicMock()
mock_message1.metadata.user = 'cache_user'
@ -425,6 +443,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase):
processor = Processor(**config)
# Add collection to known_collections (simulates config push)
processor.known_collections[('dim_user', 'dim_collection')] = {}
# Create mock message with different dimension vectors
mock_message = MagicMock()
mock_message.metadata.user = 'dim_user'
@ -494,7 +515,10 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase):
}
processor = Processor(**config)
# Add collection to known_collections (simulates config push)
processor.known_collections[('utf8_user', 'utf8_collection')] = {}
# Create mock message with UTF-8 encoded text
mock_message = MagicMock()
mock_message.metadata.user = 'utf8_user'
@ -533,7 +557,10 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase):
}
processor = Processor(**config)
# Add collection to known_collections (simulates config push)
processor.known_collections[('decode_user', 'decode_collection')] = {}
# Create mock message with decode error
mock_message = MagicMock()
mock_message.metadata.user = 'decode_user'

View file

@ -57,7 +57,10 @@ class TestQdrantGraphEmbeddingsStorage(IsolatedAsyncioTestCase):
}
processor = Processor(**config)
# Add collection to known_collections (simulates config push)
processor.known_collections[('test_user', 'test_collection')] = {}
# Create mock message with entities and vectors
mock_message = MagicMock()
mock_message.metadata.user = 'test_user'
@ -107,7 +110,10 @@ class TestQdrantGraphEmbeddingsStorage(IsolatedAsyncioTestCase):
}
processor = Processor(**config)
# Add collection to known_collections (simulates config push)
processor.known_collections[('multi_user', 'multi_collection')] = {}
# Create mock message with multiple entities
mock_message = MagicMock()
mock_message.metadata.user = 'multi_user'
@ -163,7 +169,10 @@ class TestQdrantGraphEmbeddingsStorage(IsolatedAsyncioTestCase):
}
processor = Processor(**config)
# Add collection to known_collections (simulates config push)
processor.known_collections[('vector_user', 'vector_collection')] = {}
# Create mock message with entity having multiple vectors
mock_message = MagicMock()
mock_message.metadata.user = 'vector_user'

View file

@ -35,12 +35,8 @@ class CollectionConfigHandler:
"""
logger.info(f"Processing collection configuration (version {version})")
# Extract collections from config
if "collection" not in config:
logger.debug("No collection configuration in config push")
return
collection_config = config["collection"]
# Extract collections from config (treat missing key as empty)
collection_config = config.get("collection", {})
# Track which collections we've seen in this config
current_collections: Set[tuple] = set()
@ -81,10 +77,15 @@ class CollectionConfigHandler:
for user, collection in deleted_collections:
logger.info(f"Collection deleted: {user}/{collection}")
try:
await self.delete_collection(user, collection)
# Remove from known_collections FIRST to immediately reject new writes
# This eliminates race condition with worker threads
del self.known_collections[(user, collection)]
# Physical deletion happens after - worker threads already rejecting writes
await self.delete_collection(user, collection)
except Exception as e:
logger.error(f"Error deleting collection {user}/{collection}: {e}", exc_info=True)
# If physical deletion failed, should we re-add to known_collections?
# For now, keep it removed - collection is logically deleted per config
logger.debug(f"Collection config processing complete. Known collections: {len(self.known_collections)}")

View file

@ -1,45 +1,8 @@
from dataclasses import dataclass
from ..core.primitives import Error
from ..core.topic import topic
############################################################################
# Storage management operations
@dataclass
class StorageManagementRequest:
"""Request for storage management operations sent to store processors"""
operation: str = "" # e.g., "delete-collection"
user: str = ""
collection: str = ""
@dataclass
class StorageManagementResponse:
"""Response from storage processors for management operations"""
error: Error | None = None # Only populated if there's an error, if null success
############################################################################
# Storage management topics
# Topics for sending collection management requests to different storage types
vector_storage_management_topic = topic(
'vector-storage-management', qos='q0', namespace='request'
)
object_storage_management_topic = topic(
'object-storage-management', qos='q0', namespace='request'
)
triples_storage_management_topic = topic(
'triples-storage-management', qos='q0', namespace='request'
)
# Topic for receiving responses from storage processors
storage_management_response_topic = topic(
'storage-management', qos='q0', namespace='response'
)
############################################################################
# This file previously contained legacy storage management queue definitions
# (StorageManagementRequest, StorageManagementResponse, and related topics).
#
# These have been removed as collection management now uses a config-based
# approach via CollectionConfigHandler instead of request/response queues.
#
# This file is kept for potential future storage-related schema definitions.

View file

@ -90,6 +90,15 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
async def store_document_embeddings(self, message):
# Validate collection exists in config before processing
if not self.collection_exists(message.metadata.user, message.metadata.collection):
logger.warning(
f"Collection {message.metadata.collection} for user {message.metadata.user} "
f"does not exist in config (likely deleted while data was in-flight). "
f"Dropping message."
)
return
for emb in message.chunks:
if emb.chunk is None or emb.chunk == b"": continue
@ -105,7 +114,7 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
f"d-{message.metadata.user}-{message.metadata.collection}-{dim}"
)
# Lazily create index if it doesn't exist
# Lazily create index if it doesn't exist (but only if authorized in config)
if not self.pinecone.has_index(index_name):
logger.info(f"Lazily creating Pinecone index {index_name} with dimension {dim}")
self.create_index(index_name, dim)

View file

@ -41,6 +41,15 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
async def store_document_embeddings(self, message):
# Validate collection exists in config before processing
if not self.collection_exists(message.metadata.user, message.metadata.collection):
logger.warning(
f"Collection {message.metadata.collection} for user {message.metadata.user} "
f"does not exist in config (likely deleted while data was in-flight). "
f"Dropping message."
)
return
for emb in message.chunks:
chunk = emb.chunk.decode("utf-8")
@ -54,7 +63,7 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
f"d_{message.metadata.user}_{message.metadata.collection}_{dim}"
)
# Lazily create collection if it doesn't exist
# Lazily create collection if it doesn't exist (but only if authorized in config)
if not self.qdrant.collection_exists(collection):
logger.info(f"Lazily creating Qdrant collection {collection} with dimension {dim}")
self.qdrant.create_collection(

View file

@ -90,6 +90,15 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
async def store_graph_embeddings(self, message):
# Validate collection exists in config before processing
if not self.collection_exists(message.metadata.user, message.metadata.collection):
logger.warning(
f"Collection {message.metadata.collection} for user {message.metadata.user} "
f"does not exist in config (likely deleted while data was in-flight). "
f"Dropping message."
)
return
for entity in message.entities:
if entity.entity.value == "" or entity.entity.value is None:
@ -103,7 +112,7 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
f"t-{message.metadata.user}-{message.metadata.collection}-{dim}"
)
# Lazily create index if it doesn't exist
# Lazily create index if it doesn't exist (but only if authorized in config)
if not self.pinecone.has_index(index_name):
logger.info(f"Lazily creating Pinecone index {index_name} with dimension {dim}")
self.create_index(index_name, dim)

View file

@ -41,6 +41,15 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
async def store_graph_embeddings(self, message):
# Validate collection exists in config before processing
if not self.collection_exists(message.metadata.user, message.metadata.collection):
logger.warning(
f"Collection {message.metadata.collection} for user {message.metadata.user} "
f"does not exist in config (likely deleted while data was in-flight). "
f"Dropping message."
)
return
for entity in message.entities:
if entity.entity.value == "" or entity.entity.value is None: return
@ -53,7 +62,7 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
f"t_{message.metadata.user}_{message.metadata.collection}_{dim}"
)
# Lazily create collection if it doesn't exist
# Lazily create collection if it doesn't exist (but only if authorized in config)
if not self.qdrant.collection_exists(collection):
logger.info(f"Lazily creating Qdrant collection {collection} with dimension {dim}")
self.qdrant.create_collection(

View file

@ -13,9 +13,8 @@ from cassandra import ConsistencyLevel
from .... schema import ExtractedObject
from .... schema import RowSchema, Field
from .... schema import StorageManagementRequest, StorageManagementResponse
from .... schema import object_storage_management_topic, storage_management_response_topic
from .... base import FlowProcessor, ConsumerSpec, ProducerSpec
from .... base import CollectionConfigHandler
from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config
# Module logger
@ -23,7 +22,7 @@ logger = logging.getLogger(__name__)
default_ident = "objects-write"
class Processor(FlowProcessor):
class Processor(CollectionConfigHandler, FlowProcessor):
def __init__(self, **params):
@ -64,39 +63,9 @@ class Processor(FlowProcessor):
)
)
# Set up storage management consumer and producer directly
# (FlowProcessor doesn't support topic-based specs outside of flows)
from .... base import Consumer, Producer, ConsumerMetrics, ProducerMetrics
storage_request_metrics = ConsumerMetrics(
processor=self.id, flow=None, name="storage-request"
)
storage_response_metrics = ProducerMetrics(
processor=self.id, flow=None, name="storage-response"
)
# Create storage management consumer
self.storage_request_consumer = Consumer(
taskgroup=self.taskgroup,
backend=self.pubsub,
flow=None,
topic=object_storage_management_topic,
subscriber=f"{id}-storage",
schema=StorageManagementRequest,
handler=self.on_storage_management,
metrics=storage_request_metrics,
)
# Create storage management response producer
self.storage_response_producer = Producer(
backend=self.pubsub,
topic=storage_management_response_topic,
schema=StorageManagementResponse,
metrics=storage_response_metrics,
)
# Register config handler for schema updates
# Register config handlers
self.register_config_handler(self.on_schema_config)
self.register_config_handler(self.on_collection_config)
# Cache of known keyspaces/tables
self.known_keyspaces: Set[str] = set()
@ -347,28 +316,14 @@ class Processor(FlowProcessor):
obj = msg.value()
logger.info(f"Storing {len(obj.values)} objects for schema {obj.schema_name} from {obj.metadata.id}")
# Validate collection/keyspace exists before accepting writes
safe_keyspace = self.sanitize_name(obj.metadata.user)
if safe_keyspace not in self.known_keyspaces:
# Check if keyspace actually exists in Cassandra
self.connect_cassandra()
check_keyspace_cql = """
SELECT keyspace_name FROM system_schema.keyspaces
WHERE keyspace_name = %s
"""
result = self.session.execute(check_keyspace_cql, (safe_keyspace,))
# Check if result is None (mock case) or has no rows
if result is None or not result.one():
error_msg = (
f"Collection {obj.metadata.collection} does not exist. "
f"Create it first via collection management API."
)
logger.error(error_msg)
raise ValueError(error_msg)
# Cache it if it exists
self.known_keyspaces.add(safe_keyspace)
if safe_keyspace not in self.known_tables:
self.known_tables[safe_keyspace] = set()
# Validate collection exists before accepting writes
if not self.collection_exists(obj.metadata.user, obj.metadata.collection):
error_msg = (
f"Collection {obj.metadata.collection} does not exist. "
f"Create it first via collection management API."
)
logger.error(error_msg)
raise ValueError(error_msg)
# Get schema definition
schema = self.schemas.get(obj.schema_name)
@ -447,55 +402,7 @@ class Processor(FlowProcessor):
logger.error(f"Failed to insert object {obj_index}: {e}", exc_info=True)
raise
async def on_storage_management(self, msg, consumer, flow):
"""Handle storage management requests for collection operations"""
request = msg.value()
logger.info(f"Received storage management request: {request.operation} for {request.user}/{request.collection}")
try:
if request.operation == "create-collection":
await self.create_collection(request.user, request.collection)
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully created collection {request.user}/{request.collection}")
elif request.operation == "delete-collection":
await self.delete_collection(request.user, request.collection)
# Send success response
response = StorageManagementResponse(
error=None # No error means success
)
await self.storage_response_producer.send(response)
logger.info(f"Successfully deleted collection {request.user}/{request.collection}")
else:
logger.warning(f"Unknown storage management operation: {request.operation}")
# Send error response
from .... schema import Error
response = StorageManagementResponse(
error=Error(
type="unknown_operation",
message=f"Unknown operation: {request.operation}"
)
)
await self.storage_response_producer.send(response)
except Exception as e:
logger.error(f"Error handling storage management request: {e}", exc_info=True)
# Send error response
from .... schema import Error
response = StorageManagementResponse(
error=Error(
type="processing_error",
message=str(e)
)
)
await self.storage_response_producer.send(response)
async def create_collection(self, user: str, collection: str):
async def create_collection(self, user: str, collection: str, metadata: dict):
"""Create/verify collection exists in Cassandra object store"""
# Connect if not already connected
self.connect_cassandra()