diff --git a/docs/tech-specs/collection-management.md b/docs/tech-specs/collection-management.md index ffc5f63f..542abdd0 100644 --- a/docs/tech-specs/collection-management.md +++ b/docs/tech-specs/collection-management.md @@ -233,9 +233,13 @@ When a user initiates collection deletion through the librarian service: #### Collection Management Interface -All store writers implement a standardized collection management interface with a common schema: +**⚠️ LEGACY APPROACH - REPLACED BY CONFIG-BASED PATTERN** -**Message Schema (`StorageManagementRequest`):** +The queue-based architecture described below has been replaced with a config-based approach using `CollectionConfigHandler`. All storage backends now receive collection updates via config push messages instead of dedicated management queues. + +~~All store writers implement a standardized collection management interface with a common schema:~~ + +~~**Message Schema (`StorageManagementRequest`):**~~ ```json { "operation": "create-collection" | "delete-collection", @@ -244,24 +248,26 @@ All store writers implement a standardized collection management interface with } ``` -**Queue Architecture:** -- **Vector Store Management Queue** (`vector-storage-management`): Vector/embedding stores -- **Object Store Management Queue** (`object-storage-management`): Object/document stores -- **Triple Store Management Queue** (`triples-storage-management`): Graph/RDF stores -- **Storage Response Queue** (`storage-management-response`): All responses sent here +~~**Queue Architecture:**~~ +- ~~**Vector Store Management Queue** (`vector-storage-management`): Vector/embedding stores~~ +- ~~**Object Store Management Queue** (`object-storage-management`): Object/document stores~~ +- ~~**Triple Store Management Queue** (`triples-storage-management`): Graph/RDF stores~~ +- ~~**Storage Response Queue** (`storage-management-response`): All responses sent here~~ -Each store writer implements: -- **Collection Management Handler**: Processes `StorageManagementRequest` messages -- **Create Collection Operation**: Establishes collection in storage backend -- **Delete Collection Operation**: Removes all data associated with collection -- **Collection State Tracking**: Maintains knowledge of which collections exist -- **Message Processing**: Consumes from dedicated management queue -- **Status Reporting**: Returns success/failure via `StorageManagementResponse` -- **Idempotent Operations**: Safe to call create/delete multiple times +**Current Implementation:** -**Supported Operations:** -- `create-collection`: Create collection in storage backend -- `delete-collection`: Remove all collection data from storage backend +All storage backends now use `CollectionConfigHandler`: +- **Config Push Integration**: Storage services register for config push notifications +- **Automatic Synchronization**: Collections created/deleted based on config changes +- **Declarative Model**: Collections defined in config service, backends sync to match +- **No Request/Response**: Eliminates coordination overhead and response tracking +- **Collection State Tracking**: Maintained via `known_collections` cache +- **Idempotent Operations**: Safe to process same config multiple times + +Each storage backend implements: +- `create_collection(user: str, collection: str, metadata: dict)` - Create collection structures +- `delete_collection(user: str, collection: str)` - Remove all collection data +- `collection_exists(user: str, collection: str) -> bool` - Validate before writes #### Cassandra Triple Store Refactor @@ -365,62 +371,33 @@ Comprehensive testing will cover: - `triples_collection` table for SPO queries and deletion tracking - Collection deletion implemented with read-then-delete pattern -### 🔄 In Progress Components +### ✅ Migration to Config-Based Pattern - COMPLETED -1. **Collection Creation Broadcast** (`trustgraph-flow/trustgraph/librarian/collection_manager.py`) - - Update `update_collection()` to send "create-collection" to storage backends - - Wait for confirmations from all storage processors - - Handle creation failures appropriately +**All storage backends have been migrated from the queue-based pattern to the config-based `CollectionConfigHandler` pattern.** -2. **Document Submission Handler** (`trustgraph-flow/trustgraph/librarian/service.py` or similar) - - Check if collection exists when document submitted - - If not exists: Create collection with defaults before processing document - - Trigger same "create-collection" broadcast as `tg-set-collection` - - Ensure collection established before document flows to storage processors +Completed migrations: +- ✅ `trustgraph-flow/trustgraph/storage/triples/cassandra/write.py` +- ✅ `trustgraph-flow/trustgraph/storage/triples/neo4j/write.py` +- ✅ `trustgraph-flow/trustgraph/storage/triples/memgraph/write.py` +- ✅ `trustgraph-flow/trustgraph/storage/triples/falkordb/write.py` +- ✅ `trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py` +- ✅ `trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py` +- ✅ `trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py` +- ✅ `trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py` +- ✅ `trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py` +- ✅ `trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py` +- ✅ `trustgraph-flow/trustgraph/storage/objects/cassandra/write.py` -### ❌ Pending Components +All backends now: +- Inherit from `CollectionConfigHandler` +- Register for config push notifications via `self.register_config_handler(self.on_collection_config)` +- Implement `create_collection(user, collection, metadata)` and `delete_collection(user, collection)` +- Use `collection_exists(user, collection)` to validate before writes +- Automatically sync with config service changes -1. **Collection State Tracking** - Need to implement in each storage backend: - - **Cassandra Triples**: Use `triples_collection` table with marker triples - - **Neo4j/Memgraph/FalkorDB**: Create `:CollectionMetadata` nodes - - **Qdrant/Milvus/Pinecone**: Use native collection APIs - - **Cassandra Objects**: Add collection metadata tracking - -2. **Storage Management Handlers** - Need "create-collection" support in 12 files: - - `trustgraph-flow/trustgraph/storage/triples/cassandra/write.py` - - `trustgraph-flow/trustgraph/storage/triples/neo4j/write.py` - - `trustgraph-flow/trustgraph/storage/triples/memgraph/write.py` - - `trustgraph-flow/trustgraph/storage/triples/falkordb/write.py` - - `trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py` - - `trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py` - - `trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py` - - `trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py` - - `trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py` - - `trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py` - - `trustgraph-flow/trustgraph/storage/objects/cassandra/write.py` - - Plus any other storage implementations - -3. **Write Operation Validation** - Add collection existence checks to all `store_*` methods - -4. **Query Operation Handling** - Update queries to return empty for non-existent collections - -### Next Implementation Steps - -**Phase 1: Core Infrastructure (2-3 days)** -1. Add collection state tracking methods to all storage backends -2. Implement `collection_exists()` and `create_collection()` methods - -**Phase 2: Storage Handlers (1 week)** -3. Add "create-collection" handlers to all storage processors -4. Add write validation to reject non-existent collections -5. Update query handling for non-existent collections - -**Phase 3: Collection Manager (2-3 days)** -6. Update collection_manager to broadcast creates -7. Implement response tracking and error handling - -**Phase 4: Testing (3-5 days)** -8. End-to-end testing of explicit creation workflow -9. Test all storage backends -10. Validate error handling and edge cases +Legacy queue-based infrastructure removed: +- ✅ Removed `StorageManagementRequest` and `StorageManagementResponse` schemas +- ✅ Removed storage management queue topic definitions +- ✅ Removed storage management consumer/producer from all backends +- ✅ Removed `on_storage_management` handlers from all backends diff --git a/docs/tech-specs/multi-tenant-support.md b/docs/tech-specs/multi-tenant-support.md index bfbad543..dc0555c1 100644 --- a/docs/tech-specs/multi-tenant-support.md +++ b/docs/tech-specs/multi-tenant-support.md @@ -62,19 +62,20 @@ When tenant A starts flow `tenant-a-prod` and tenant B starts flow `tenant-b-pro - **Impact:** Config, cores, and librarian services - **Blocks:** Multiple tenants cannot use separate Cassandra keyspaces -### Issue #4: Collection Management Architecture -- **Current:** Collections stored in Cassandra librarian keyspace via separate collections table -- **Current:** Librarian uses 4 hardcoded storage management topics to coordinate collection create/delete: +### Issue #4: Collection Management Architecture ✅ COMPLETED +- **Previous:** Collections stored in Cassandra librarian keyspace via separate collections table +- **Previous:** Librarian used 4 hardcoded storage management topics to coordinate collection create/delete: - `vector_storage_management_topic` - `object_storage_management_topic` - `triples_storage_management_topic` - `storage_management_response_topic` -- **Problems:** - - Hardcoded topics cannot be customized for multi-tenant deployments +- **Problems (Resolved):** + - Hardcoded topics could not be customized for multi-tenant deployments - Complex async coordination between librarian and 4+ storage services - Separate Cassandra table and management infrastructure - Non-persistent request/response queues for critical operations -- **Solution:** Migrate collections to config service storage, use config push for distribution +- **Solution Implemented:** Migrated collections to config service storage, use config push for distribution +- **Status:** All storage backends migrated to `CollectionConfigHandler` pattern ## Solution @@ -448,7 +449,9 @@ async def delete_collection(self, user, collection): - Breaking change acceptable - no data migration needed - Simplifies librarian service significantly -#### Change 10: Storage Services - Config-Based Collection Management +#### Change 10: Storage Services - Config-Based Collection Management ✅ COMPLETED + +**Status:** All 11 storage backends have been migrated to use `CollectionConfigHandler`. **Affected Services (11 total):** - Document embeddings: milvus, pinecone, qdrant @@ -708,9 +711,9 @@ All services inheriting from AsyncProcessor or FlowProcessor: - tables/library.py (collections table removal) - schema/services/collection.py (timestamp removal) -**Deferred Changes (Change 10):** -- All storage services (11 total) - will subscribe to config push for collection updates -- Storage management schema (potentially removable if unused elsewhere) +**Completed Changes (Change 10):** ✅ +- All storage services (11 total) - migrated to config push for collection updates via `CollectionConfigHandler` +- Storage management schema removed from `storage.py` ## Future Considerations @@ -749,10 +752,11 @@ Some services use **per-user keyspaces** dynamically, where each user gets their - Update collection schema (remove timestamps) - **Outcome:** Eliminates hardcoded storage management topics, simplifies librarian -### Phase 3: Storage Service Updates (Change 10) - Deferred -- Update all storage services to use config push for collections -- Remove storage management request/response infrastructure -- **Outcome:** Complete config-based collection management +### Phase 3: Storage Service Updates (Change 10) ✅ COMPLETED +- Updated all storage services to use config push for collections via `CollectionConfigHandler` +- Removed storage management request/response infrastructure +- Removed legacy schema definitions +- **Outcome:** Complete config-based collection management achieved ## References - GitHub Issue: https://github.com/trustgraph-ai/trustgraph/issues/582 diff --git a/tests/integration/test_objects_cassandra_integration.py b/tests/integration/test_objects_cassandra_integration.py index 21b414c1..3310b396 100644 --- a/tests/integration/test_objects_cassandra_integration.py +++ b/tests/integration/test_objects_cassandra_integration.py @@ -117,7 +117,7 @@ class TestObjectsCassandraIntegration: assert "customer_records" in processor.schemas # Step 1.5: Create the collection first (simulate tg-set-collection) - await processor.create_collection("test_user", "import_2024") + await processor.create_collection("test_user", "import_2024", {}) # Step 2: Process an ExtractedObject test_obj = ExtractedObject( @@ -213,8 +213,8 @@ class TestObjectsCassandraIntegration: assert len(processor.schemas) == 2 # Create collections first - await processor.create_collection("shop", "catalog") - await processor.create_collection("shop", "sales") + await processor.create_collection("shop", "catalog", {}) + await processor.create_collection("shop", "sales", {}) # Process objects for different schemas product_obj = ExtractedObject( @@ -263,7 +263,7 @@ class TestObjectsCassandraIntegration: ) # Create collection first - await processor.create_collection("test", "test") + await processor.create_collection("test", "test", {}) # Create object missing required field test_obj = ExtractedObject( @@ -302,7 +302,7 @@ class TestObjectsCassandraIntegration: ) # Create collection first - await processor.create_collection("logger", "app_events") + await processor.create_collection("logger", "app_events", {}) # Process object test_obj = ExtractedObject( @@ -407,7 +407,7 @@ class TestObjectsCassandraIntegration: # Create all collections first for coll in collections: - await processor.create_collection("analytics", coll) + await processor.create_collection("analytics", coll, {}) for coll in collections: obj = ExtractedObject( @@ -486,7 +486,7 @@ class TestObjectsCassandraIntegration: ) # Create collection first - await processor.create_collection("test_user", "batch_import") + await processor.create_collection("test_user", "batch_import", {}) msg = MagicMock() msg.value.return_value = batch_obj @@ -532,7 +532,7 @@ class TestObjectsCassandraIntegration: ) # Create collection first - await processor.create_collection("test", "empty") + await processor.create_collection("test", "empty", {}) # Process empty batch object empty_obj = ExtractedObject( @@ -573,7 +573,7 @@ class TestObjectsCassandraIntegration: ) # Create collection first - await processor.create_collection("test", "mixed") + await processor.create_collection("test", "mixed", {}) # Single object (backward compatibility) single_obj = ExtractedObject( diff --git a/tests/unit/test_storage/test_doc_embeddings_qdrant_storage.py b/tests/unit/test_storage/test_doc_embeddings_qdrant_storage.py index 3698526a..fc839482 100644 --- a/tests/unit/test_storage/test_doc_embeddings_qdrant_storage.py +++ b/tests/unit/test_storage/test_doc_embeddings_qdrant_storage.py @@ -78,7 +78,10 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): } processor = Processor(**config) - + + # Add collection to known_collections (simulates config push) + processor.known_collections[('test_user', 'test_collection')] = {} + # Create mock message with chunks and vectors mock_message = MagicMock() mock_message.metadata.user = 'test_user' @@ -129,7 +132,10 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): } processor = Processor(**config) - + + # Add collection to known_collections (simulates config push) + processor.known_collections[('multi_user', 'multi_collection')] = {} + # Create mock message with multiple chunks mock_message = MagicMock() mock_message.metadata.user = 'multi_user' @@ -186,7 +192,10 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): } processor = Processor(**config) - + + # Add collection to known_collections (simulates config push) + processor.known_collections[('vector_user', 'vector_collection')] = {} + # Create mock message with chunk having multiple vectors mock_message = MagicMock() mock_message.metadata.user = 'vector_user' @@ -280,6 +289,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): processor = Processor(**config) + # Add collection to known_collections (simulates config push) + processor.known_collections[('new_user', 'new_collection')] = {} + # Create mock message mock_message = MagicMock() mock_message.metadata.user = 'new_user' @@ -329,6 +341,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): processor = Processor(**config) + # Add collection to known_collections (simulates config push) + processor.known_collections[('error_user', 'error_collection')] = {} + # Create mock message mock_message = MagicMock() mock_message.metadata.user = 'error_user' @@ -364,6 +379,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): processor = Processor(**config) + # Add collection to known_collections (simulates config push) + processor.known_collections[('cache_user', 'cache_collection')] = {} + # Create first mock message mock_message1 = MagicMock() mock_message1.metadata.user = 'cache_user' @@ -425,6 +443,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): processor = Processor(**config) + # Add collection to known_collections (simulates config push) + processor.known_collections[('dim_user', 'dim_collection')] = {} + # Create mock message with different dimension vectors mock_message = MagicMock() mock_message.metadata.user = 'dim_user' @@ -494,7 +515,10 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): } processor = Processor(**config) - + + # Add collection to known_collections (simulates config push) + processor.known_collections[('utf8_user', 'utf8_collection')] = {} + # Create mock message with UTF-8 encoded text mock_message = MagicMock() mock_message.metadata.user = 'utf8_user' @@ -533,7 +557,10 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): } processor = Processor(**config) - + + # Add collection to known_collections (simulates config push) + processor.known_collections[('decode_user', 'decode_collection')] = {} + # Create mock message with decode error mock_message = MagicMock() mock_message.metadata.user = 'decode_user' diff --git a/tests/unit/test_storage/test_graph_embeddings_qdrant_storage.py b/tests/unit/test_storage/test_graph_embeddings_qdrant_storage.py index c909a1fb..d240b892 100644 --- a/tests/unit/test_storage/test_graph_embeddings_qdrant_storage.py +++ b/tests/unit/test_storage/test_graph_embeddings_qdrant_storage.py @@ -57,7 +57,10 @@ class TestQdrantGraphEmbeddingsStorage(IsolatedAsyncioTestCase): } processor = Processor(**config) - + + # Add collection to known_collections (simulates config push) + processor.known_collections[('test_user', 'test_collection')] = {} + # Create mock message with entities and vectors mock_message = MagicMock() mock_message.metadata.user = 'test_user' @@ -107,7 +110,10 @@ class TestQdrantGraphEmbeddingsStorage(IsolatedAsyncioTestCase): } processor = Processor(**config) - + + # Add collection to known_collections (simulates config push) + processor.known_collections[('multi_user', 'multi_collection')] = {} + # Create mock message with multiple entities mock_message = MagicMock() mock_message.metadata.user = 'multi_user' @@ -163,7 +169,10 @@ class TestQdrantGraphEmbeddingsStorage(IsolatedAsyncioTestCase): } processor = Processor(**config) - + + # Add collection to known_collections (simulates config push) + processor.known_collections[('vector_user', 'vector_collection')] = {} + # Create mock message with entity having multiple vectors mock_message = MagicMock() mock_message.metadata.user = 'vector_user' diff --git a/trustgraph-base/trustgraph/base/collection_config_handler.py b/trustgraph-base/trustgraph/base/collection_config_handler.py index 2d752b3c..8c1af822 100644 --- a/trustgraph-base/trustgraph/base/collection_config_handler.py +++ b/trustgraph-base/trustgraph/base/collection_config_handler.py @@ -35,12 +35,8 @@ class CollectionConfigHandler: """ logger.info(f"Processing collection configuration (version {version})") - # Extract collections from config - if "collection" not in config: - logger.debug("No collection configuration in config push") - return - - collection_config = config["collection"] + # Extract collections from config (treat missing key as empty) + collection_config = config.get("collection", {}) # Track which collections we've seen in this config current_collections: Set[tuple] = set() @@ -81,10 +77,15 @@ class CollectionConfigHandler: for user, collection in deleted_collections: logger.info(f"Collection deleted: {user}/{collection}") try: - await self.delete_collection(user, collection) + # Remove from known_collections FIRST to immediately reject new writes + # This eliminates race condition with worker threads del self.known_collections[(user, collection)] + # Physical deletion happens after - worker threads already rejecting writes + await self.delete_collection(user, collection) except Exception as e: logger.error(f"Error deleting collection {user}/{collection}: {e}", exc_info=True) + # If physical deletion failed, should we re-add to known_collections? + # For now, keep it removed - collection is logically deleted per config logger.debug(f"Collection config processing complete. Known collections: {len(self.known_collections)}") diff --git a/trustgraph-base/trustgraph/schema/services/storage.py b/trustgraph-base/trustgraph/schema/services/storage.py index e65fb793..b010e54b 100644 --- a/trustgraph-base/trustgraph/schema/services/storage.py +++ b/trustgraph-base/trustgraph/schema/services/storage.py @@ -1,45 +1,8 @@ -from dataclasses import dataclass - -from ..core.primitives import Error -from ..core.topic import topic - -############################################################################ - -# Storage management operations - -@dataclass -class StorageManagementRequest: - """Request for storage management operations sent to store processors""" - operation: str = "" # e.g., "delete-collection" - user: str = "" - collection: str = "" - -@dataclass -class StorageManagementResponse: - """Response from storage processors for management operations""" - error: Error | None = None # Only populated if there's an error, if null success - -############################################################################ - -# Storage management topics - -# Topics for sending collection management requests to different storage types -vector_storage_management_topic = topic( - 'vector-storage-management', qos='q0', namespace='request' -) - -object_storage_management_topic = topic( - 'object-storage-management', qos='q0', namespace='request' -) - -triples_storage_management_topic = topic( - 'triples-storage-management', qos='q0', namespace='request' -) - -# Topic for receiving responses from storage processors -storage_management_response_topic = topic( - 'storage-management', qos='q0', namespace='response' -) - -############################################################################ +# This file previously contained legacy storage management queue definitions +# (StorageManagementRequest, StorageManagementResponse, and related topics). +# +# These have been removed as collection management now uses a config-based +# approach via CollectionConfigHandler instead of request/response queues. +# +# This file is kept for potential future storage-related schema definitions. diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py index 846855a4..6d1b23ba 100644 --- a/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py @@ -90,6 +90,15 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService): async def store_document_embeddings(self, message): + # Validate collection exists in config before processing + if not self.collection_exists(message.metadata.user, message.metadata.collection): + logger.warning( + f"Collection {message.metadata.collection} for user {message.metadata.user} " + f"does not exist in config (likely deleted while data was in-flight). " + f"Dropping message." + ) + return + for emb in message.chunks: if emb.chunk is None or emb.chunk == b"": continue @@ -105,7 +114,7 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService): f"d-{message.metadata.user}-{message.metadata.collection}-{dim}" ) - # Lazily create index if it doesn't exist + # Lazily create index if it doesn't exist (but only if authorized in config) if not self.pinecone.has_index(index_name): logger.info(f"Lazily creating Pinecone index {index_name} with dimension {dim}") self.create_index(index_name, dim) diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py index 923ade10..edfa8aa9 100644 --- a/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py @@ -41,6 +41,15 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService): async def store_document_embeddings(self, message): + # Validate collection exists in config before processing + if not self.collection_exists(message.metadata.user, message.metadata.collection): + logger.warning( + f"Collection {message.metadata.collection} for user {message.metadata.user} " + f"does not exist in config (likely deleted while data was in-flight). " + f"Dropping message." + ) + return + for emb in message.chunks: chunk = emb.chunk.decode("utf-8") @@ -54,7 +63,7 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService): f"d_{message.metadata.user}_{message.metadata.collection}_{dim}" ) - # Lazily create collection if it doesn't exist + # Lazily create collection if it doesn't exist (but only if authorized in config) if not self.qdrant.collection_exists(collection): logger.info(f"Lazily creating Qdrant collection {collection} with dimension {dim}") self.qdrant.create_collection( diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py index 9fe38720..0bee6ceb 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py @@ -90,6 +90,15 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService): async def store_graph_embeddings(self, message): + # Validate collection exists in config before processing + if not self.collection_exists(message.metadata.user, message.metadata.collection): + logger.warning( + f"Collection {message.metadata.collection} for user {message.metadata.user} " + f"does not exist in config (likely deleted while data was in-flight). " + f"Dropping message." + ) + return + for entity in message.entities: if entity.entity.value == "" or entity.entity.value is None: @@ -103,7 +112,7 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService): f"t-{message.metadata.user}-{message.metadata.collection}-{dim}" ) - # Lazily create index if it doesn't exist + # Lazily create index if it doesn't exist (but only if authorized in config) if not self.pinecone.has_index(index_name): logger.info(f"Lazily creating Pinecone index {index_name} with dimension {dim}") self.create_index(index_name, dim) diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py index 39127473..e3c2b6bc 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py @@ -41,6 +41,15 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService): async def store_graph_embeddings(self, message): + # Validate collection exists in config before processing + if not self.collection_exists(message.metadata.user, message.metadata.collection): + logger.warning( + f"Collection {message.metadata.collection} for user {message.metadata.user} " + f"does not exist in config (likely deleted while data was in-flight). " + f"Dropping message." + ) + return + for entity in message.entities: if entity.entity.value == "" or entity.entity.value is None: return @@ -53,7 +62,7 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService): f"t_{message.metadata.user}_{message.metadata.collection}_{dim}" ) - # Lazily create collection if it doesn't exist + # Lazily create collection if it doesn't exist (but only if authorized in config) if not self.qdrant.collection_exists(collection): logger.info(f"Lazily creating Qdrant collection {collection} with dimension {dim}") self.qdrant.create_collection( diff --git a/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py b/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py index 05b8100d..bcb0d57f 100644 --- a/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py @@ -13,9 +13,8 @@ from cassandra import ConsistencyLevel from .... schema import ExtractedObject from .... schema import RowSchema, Field -from .... schema import StorageManagementRequest, StorageManagementResponse -from .... schema import object_storage_management_topic, storage_management_response_topic from .... base import FlowProcessor, ConsumerSpec, ProducerSpec +from .... base import CollectionConfigHandler from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config # Module logger @@ -23,7 +22,7 @@ logger = logging.getLogger(__name__) default_ident = "objects-write" -class Processor(FlowProcessor): +class Processor(CollectionConfigHandler, FlowProcessor): def __init__(self, **params): @@ -64,39 +63,9 @@ class Processor(FlowProcessor): ) ) - # Set up storage management consumer and producer directly - # (FlowProcessor doesn't support topic-based specs outside of flows) - from .... base import Consumer, Producer, ConsumerMetrics, ProducerMetrics - - storage_request_metrics = ConsumerMetrics( - processor=self.id, flow=None, name="storage-request" - ) - storage_response_metrics = ProducerMetrics( - processor=self.id, flow=None, name="storage-response" - ) - - # Create storage management consumer - self.storage_request_consumer = Consumer( - taskgroup=self.taskgroup, - backend=self.pubsub, - flow=None, - topic=object_storage_management_topic, - subscriber=f"{id}-storage", - schema=StorageManagementRequest, - handler=self.on_storage_management, - metrics=storage_request_metrics, - ) - - # Create storage management response producer - self.storage_response_producer = Producer( - backend=self.pubsub, - topic=storage_management_response_topic, - schema=StorageManagementResponse, - metrics=storage_response_metrics, - ) - - # Register config handler for schema updates + # Register config handlers self.register_config_handler(self.on_schema_config) + self.register_config_handler(self.on_collection_config) # Cache of known keyspaces/tables self.known_keyspaces: Set[str] = set() @@ -347,28 +316,14 @@ class Processor(FlowProcessor): obj = msg.value() logger.info(f"Storing {len(obj.values)} objects for schema {obj.schema_name} from {obj.metadata.id}") - # Validate collection/keyspace exists before accepting writes - safe_keyspace = self.sanitize_name(obj.metadata.user) - if safe_keyspace not in self.known_keyspaces: - # Check if keyspace actually exists in Cassandra - self.connect_cassandra() - check_keyspace_cql = """ - SELECT keyspace_name FROM system_schema.keyspaces - WHERE keyspace_name = %s - """ - result = self.session.execute(check_keyspace_cql, (safe_keyspace,)) - # Check if result is None (mock case) or has no rows - if result is None or not result.one(): - error_msg = ( - f"Collection {obj.metadata.collection} does not exist. " - f"Create it first via collection management API." - ) - logger.error(error_msg) - raise ValueError(error_msg) - # Cache it if it exists - self.known_keyspaces.add(safe_keyspace) - if safe_keyspace not in self.known_tables: - self.known_tables[safe_keyspace] = set() + # Validate collection exists before accepting writes + if not self.collection_exists(obj.metadata.user, obj.metadata.collection): + error_msg = ( + f"Collection {obj.metadata.collection} does not exist. " + f"Create it first via collection management API." + ) + logger.error(error_msg) + raise ValueError(error_msg) # Get schema definition schema = self.schemas.get(obj.schema_name) @@ -447,55 +402,7 @@ class Processor(FlowProcessor): logger.error(f"Failed to insert object {obj_index}: {e}", exc_info=True) raise - async def on_storage_management(self, msg, consumer, flow): - """Handle storage management requests for collection operations""" - request = msg.value() - logger.info(f"Received storage management request: {request.operation} for {request.user}/{request.collection}") - - try: - if request.operation == "create-collection": - await self.create_collection(request.user, request.collection) - - # Send success response - response = StorageManagementResponse( - error=None # No error means success - ) - await self.storage_response_producer.send(response) - logger.info(f"Successfully created collection {request.user}/{request.collection}") - elif request.operation == "delete-collection": - await self.delete_collection(request.user, request.collection) - - # Send success response - response = StorageManagementResponse( - error=None # No error means success - ) - await self.storage_response_producer.send(response) - logger.info(f"Successfully deleted collection {request.user}/{request.collection}") - else: - logger.warning(f"Unknown storage management operation: {request.operation}") - # Send error response - from .... schema import Error - response = StorageManagementResponse( - error=Error( - type="unknown_operation", - message=f"Unknown operation: {request.operation}" - ) - ) - await self.storage_response_producer.send(response) - - except Exception as e: - logger.error(f"Error handling storage management request: {e}", exc_info=True) - # Send error response - from .... schema import Error - response = StorageManagementResponse( - error=Error( - type="processing_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) - - async def create_collection(self, user: str, collection: str): + async def create_collection(self, user: str, collection: str, metadata: dict): """Create/verify collection exists in Cassandra object store""" # Connect if not already connected self.connect_cassandra()