diff --git a/docs/tech-specs/multi-tenant-support.md b/docs/tech-specs/multi-tenant-support.md new file mode 100644 index 00000000..bfbad543 --- /dev/null +++ b/docs/tech-specs/multi-tenant-support.md @@ -0,0 +1,768 @@ +# Technical Specification: Multi-Tenant Support + +## Overview + +Enable multi-tenant deployments by fixing parameter name mismatches that prevent queue customization and adding Cassandra keyspace parameterization. + +## Architecture Context + +### Flow-Based Queue Resolution + +The TrustGraph system uses a **flow-based architecture** for dynamic queue resolution, which inherently supports multi-tenancy: + +- **Flow Definitions** are stored in Cassandra and specify queue names via interface definitions +- **Queue names use templates** with `{id}` variables that are replaced with flow instance IDs +- **Services dynamically resolve queues** by looking up flow configurations at request time +- **Each tenant can have unique flows** with different queue names, providing isolation + +Example flow interface definition: +```json +{ + "interfaces": { + "triples-store": "persistent://tg/flow/triples-store:{id}", + "graph-embeddings-store": "persistent://tg/flow/graph-embeddings-store:{id}" + } +} +``` + +When tenant A starts flow `tenant-a-prod` and tenant B starts flow `tenant-b-prod`, they automatically get isolated queues: +- `persistent://tg/flow/triples-store:tenant-a-prod` +- `persistent://tg/flow/triples-store:tenant-b-prod` + +**Services correctly designed for multi-tenancy:** +- βœ… **Knowledge Management (cores)** - Dynamically resolves queues from flow configuration passed in requests + +**Services needing fixes:** +- πŸ”΄ **Config Service** - Parameter name mismatch prevents queue customization +- πŸ”΄ **Librarian Service** - Hardcoded storage management topics (discussed below) +- πŸ”΄ **All Services** - Cannot customize Cassandra keyspace + +## Problem Statement + +### Issue #1: Parameter Name Mismatch in AsyncProcessor +- **CLI defines:** `--config-queue` (unclear naming) +- **Argparse converts to:** `config_queue` (in params dict) +- **Code looks for:** `config_push_queue` +- **Result:** Parameter is ignored, defaults to `persistent://tg/config/config` +- **Impact:** Affects all 32+ services inheriting from AsyncProcessor +- **Blocks:** Multi-tenant deployments cannot use tenant-specific config queues +- **Solution:** Rename CLI parameter to `--config-push-queue` for clarity (breaking change acceptable since feature is currently broken) + +### Issue #2: Parameter Name Mismatch in Config Service +- **CLI defines:** `--push-queue` (ambiguous naming) +- **Argparse converts to:** `push_queue` (in params dict) +- **Code looks for:** `config_push_queue` +- **Result:** Parameter is ignored +- **Impact:** Config service cannot use custom push queue +- **Solution:** Rename CLI parameter to `--config-push-queue` for consistency and clarity (breaking change acceptable) + +### Issue #3: Hardcoded Cassandra Keyspace +- **Current:** Keyspace hardcoded as `"config"`, `"knowledge"`, `"librarian"` in various services +- **Result:** Cannot customize keyspace for multi-tenant deployments +- **Impact:** Config, cores, and librarian services +- **Blocks:** Multiple tenants cannot use separate Cassandra keyspaces + +### Issue #4: Collection Management Architecture +- **Current:** Collections stored in Cassandra librarian keyspace via separate collections table +- **Current:** Librarian uses 4 hardcoded storage management topics to coordinate collection create/delete: + - `vector_storage_management_topic` + - `object_storage_management_topic` + - `triples_storage_management_topic` + - `storage_management_response_topic` +- **Problems:** + - Hardcoded topics cannot be customized for multi-tenant deployments + - Complex async coordination between librarian and 4+ storage services + - Separate Cassandra table and management infrastructure + - Non-persistent request/response queues for critical operations +- **Solution:** Migrate collections to config service storage, use config push for distribution + +## Solution + +This spec addresses Issues #1, #2, #3, and #4. + +### Part 1: Fix Parameter Name Mismatches + +#### Change 1: AsyncProcessor Base Class - Rename CLI Parameter +**File:** `trustgraph-base/trustgraph/base/async_processor.py` +**Line:** 260-264 + +**Current:** +```python +parser.add_argument( + '--config-queue', + default=default_config_queue, + help=f'Config push queue {default_config_queue}', +) +``` + +**Fixed:** +```python +parser.add_argument( + '--config-push-queue', + default=default_config_queue, + help=f'Config push queue (default: {default_config_queue})', +) +``` + +**Rationale:** +- Clearer, more explicit naming +- Matches the internal variable name `config_push_queue` +- Breaking change acceptable since feature is currently non-functional +- No code change needed in params.get() - it already looks for the correct name + +#### Change 2: Config Service - Rename CLI Parameter +**File:** `trustgraph-flow/trustgraph/config/service/service.py` +**Line:** 276-279 + +**Current:** +```python +parser.add_argument( + '--push-queue', + default=default_config_push_queue, + help=f'Config push queue (default: {default_config_push_queue})' +) +``` + +**Fixed:** +```python +parser.add_argument( + '--config-push-queue', + default=default_config_push_queue, + help=f'Config push queue (default: {default_config_push_queue})' +) +``` + +**Rationale:** +- Clearer naming - "config-push-queue" is more explicit than just "push-queue" +- Matches the internal variable name `config_push_queue` +- Consistent with AsyncProcessor's `--config-push-queue` parameter +- Breaking change acceptable since feature is currently non-functional +- No code change needed in params.get() - it already looks for the correct name + +### Part 2: Add Cassandra Keyspace Parameterization + +#### Change 3: Add Keyspace Parameter to cassandra_config Module +**File:** `trustgraph-base/trustgraph/base/cassandra_config.py` + +**Add CLI argument** (in `add_cassandra_args()` function): +```python +parser.add_argument( + '--cassandra-keyspace', + default=None, + help='Cassandra keyspace (default: service-specific)' +) +``` + +**Add environment variable support** (in `resolve_cassandra_config()` function): +```python +keyspace = params.get( + "cassandra_keyspace", + os.environ.get("CASSANDRA_KEYSPACE") +) +``` + +**Update return value** of `resolve_cassandra_config()`: +- Currently returns: `(hosts, username, password)` +- Change to return: `(hosts, username, password, keyspace)` + +**Rationale:** +- Consistent with existing Cassandra configuration pattern +- Available to all services via `add_cassandra_args()` +- Supports both CLI and environment variable configuration + +#### Change 4: Config Service - Use Parameterized Keyspace +**File:** `trustgraph-flow/trustgraph/config/service/service.py` + +**Line 30** - Remove hardcoded keyspace: +```python +# DELETE THIS LINE: +keyspace = "config" +``` + +**Lines 69-73** - Update cassandra config resolution: + +**Current:** +```python +cassandra_host, cassandra_username, cassandra_password = \ + resolve_cassandra_config(params) +``` + +**Fixed:** +```python +cassandra_host, cassandra_username, cassandra_password, keyspace = \ + resolve_cassandra_config(params, default_keyspace="config") +``` + +**Rationale:** +- Maintains backward compatibility with "config" as default +- Allows override via `--cassandra-keyspace` or `CASSANDRA_KEYSPACE` + +#### Change 5: Cores/Knowledge Service - Use Parameterized Keyspace +**File:** `trustgraph-flow/trustgraph/cores/service.py` + +**Line 37** - Remove hardcoded keyspace: +```python +# DELETE THIS LINE: +keyspace = "knowledge" +``` + +**Update cassandra config resolution** (similar location as config service): +```python +cassandra_host, cassandra_username, cassandra_password, keyspace = \ + resolve_cassandra_config(params, default_keyspace="knowledge") +``` + +#### Change 6: Librarian Service - Use Parameterized Keyspace +**File:** `trustgraph-flow/trustgraph/librarian/service.py` + +**Line 51** - Remove hardcoded keyspace: +```python +# DELETE THIS LINE: +keyspace = "librarian" +``` + +**Update cassandra config resolution** (similar location as config service): +```python +cassandra_host, cassandra_username, cassandra_password, keyspace = \ + resolve_cassandra_config(params, default_keyspace="librarian") +``` + +### Part 3: Migrate Collection Management to Config Service + +#### Overview +Migrate collections from Cassandra librarian keyspace to config service storage. This eliminates hardcoded storage management topics and simplifies the architecture by using the existing config push mechanism for distribution. + +#### Current Architecture +``` +API Request β†’ Gateway β†’ Librarian Service + ↓ + CollectionManager + ↓ + Cassandra Collections Table (librarian keyspace) + ↓ + Broadcast to 4 Storage Management Topics (hardcoded) + ↓ + Wait for 4+ Storage Service Responses + ↓ + Response to Gateway +``` + +#### New Architecture +``` +API Request β†’ Gateway β†’ Librarian Service + ↓ + CollectionManager + ↓ + Config Service API (put/delete/getvalues) + ↓ + Cassandra Config Table (class='collections', key='user:collection') + ↓ + Config Push (to all subscribers on config-push-queue) + ↓ + All Storage Services receive config update independently +``` + +#### Change 7: Collection Manager - Use Config Service API +**File:** `trustgraph-flow/trustgraph/librarian/collection_manager.py` + +**Remove:** +- `LibraryTableStore` usage (Lines 33, 40-41) +- Storage management producers initialization (Lines 86-140) +- `on_storage_response` method (Lines 400-430) +- `pending_deletions` tracking (Lines 57, 90-96, and usage throughout) + +**Add:** +- Config service client for API calls (request/response pattern) + +**Config Client Setup:** +```python +# In __init__, add config request/response producers/consumers +from trustgraph.schema.services.config import ConfigRequest, ConfigResponse + +# Producer for config requests +self.config_request_producer = Producer( + client=pulsar_client, + topic=config_request_queue, + schema=ConfigRequest, +) + +# Consumer for config responses (with correlation ID) +self.config_response_consumer = Consumer( + taskgroup=taskgroup, + client=pulsar_client, + flow=None, + topic=config_response_queue, + subscriber=f"{id}-config", + schema=ConfigResponse, + handler=self.on_config_response, +) + +# Tracking for pending config requests +self.pending_config_requests = {} # request_id -> asyncio.Event +``` + +**Modify `list_collections` (Lines 145-180):** +```python +async def list_collections(self, user, tag_filter=None, limit=None): + """List collections from config service""" + # Send getvalues request to config service + request = ConfigRequest( + id=str(uuid.uuid4()), + operation='getvalues', + type='collections', + ) + + # Send request and wait for response + response = await self.send_config_request(request) + + # Parse collections from response + collections = [] + for key, value_json in response.values.items(): + if ":" in key: + coll_user, collection = key.split(":", 1) + if coll_user == user: + metadata = json.loads(value_json) + collections.append(CollectionMetadata(**metadata)) + + # Apply tag filtering in-memory (as before) + if tag_filter: + collections = [c for c in collections if any(tag in c.tags for tag in tag_filter)] + + # Apply limit + if limit: + collections = collections[:limit] + + return collections + +async def send_config_request(self, request): + """Send config request and wait for response""" + event = asyncio.Event() + self.pending_config_requests[request.id] = event + + await self.config_request_producer.send(request) + await event.wait() + + return self.pending_config_requests.pop(request.id + "_response") + +async def on_config_response(self, message, consumer, flow): + """Handle config response""" + response = message.value() + if response.id in self.pending_config_requests: + self.pending_config_requests[response.id + "_response"] = response + self.pending_config_requests[response.id].set() +``` + +**Modify `update_collection` (Lines 182-312):** +```python +async def update_collection(self, user, collection, name, description, tags): + """Update collection via config service""" + # Create metadata + metadata = CollectionMetadata( + user=user, + collection=collection, + name=name, + description=description, + tags=tags, + ) + + # Send put request to config service + request = ConfigRequest( + id=str(uuid.uuid4()), + operation='put', + type='collections', + key=f'{user}:{collection}', + value=json.dumps(metadata.to_dict()), + ) + + response = await self.send_config_request(request) + + if response.error: + raise RuntimeError(f"Config update failed: {response.error.message}") + + # Config service will trigger config push automatically + # Storage services will receive update and create collections +``` + +**Modify `delete_collection` (Lines 314-398):** +```python +async def delete_collection(self, user, collection): + """Delete collection via config service""" + # Send delete request to config service + request = ConfigRequest( + id=str(uuid.uuid4()), + operation='delete', + type='collections', + key=f'{user}:{collection}', + ) + + response = await self.send_config_request(request) + + if response.error: + raise RuntimeError(f"Config delete failed: {response.error.message}") + + # Config service will trigger config push automatically + # Storage services will receive update and delete collections +``` + +**Collection Metadata Format:** +- Stored in config table as: `class='collections', key='user:collection'` +- Value is JSON-serialized CollectionMetadata (without timestamp fields) +- Fields: `user`, `collection`, `name`, `description`, `tags` +- Example: `class='collections', key='alice:my-docs', value='{"user":"alice","collection":"my-docs","name":"My Documents","description":"...","tags":["work"]}'` + +#### Change 8: Librarian Service - Remove Storage Management Infrastructure +**File:** `trustgraph-flow/trustgraph/librarian/service.py` + +**Remove:** +- Storage management producers (Lines 173-190): + - `vector_storage_management_producer` + - `object_storage_management_producer` + - `triples_storage_management_producer` +- Storage response consumer (Lines 192-201) +- `on_storage_response` handler (Lines 467-473) + +**Modify:** +- CollectionManager initialization (Lines 215-224) - remove storage producer parameters + +**Note:** External collection API remains unchanged: +- `list-collections` +- `update-collection` +- `delete-collection` + +#### Change 9: Remove Collections Table from LibraryTableStore +**File:** `trustgraph-flow/trustgraph/tables/library.py` + +**Delete:** +- Collections table CREATE statement (Lines 114-127) +- Collections prepared statements (Lines 205-240) +- All collection methods (Lines 578-717): + - `ensure_collection_exists` + - `list_collections` + - `update_collection` + - `delete_collection` + - `get_collection` + - `create_collection` + +**Rationale:** +- Collections now stored in config table +- Breaking change acceptable - no data migration needed +- Simplifies librarian service significantly + +#### Change 10: Storage Services - Config-Based Collection Management + +**Affected Services (11 total):** +- Document embeddings: milvus, pinecone, qdrant +- Graph embeddings: milvus, pinecone, qdrant +- Object storage: cassandra +- Triples storage: cassandra, falkordb, memgraph, neo4j + +**Files:** +- `trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py` +- `trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py` +- `trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py` +- `trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py` +- `trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py` +- `trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py` +- `trustgraph-flow/trustgraph/storage/objects/cassandra/write.py` +- `trustgraph-flow/trustgraph/storage/triples/cassandra/write.py` +- `trustgraph-flow/trustgraph/storage/triples/falkordb/write.py` +- `trustgraph-flow/trustgraph/storage/triples/memgraph/write.py` +- `trustgraph-flow/trustgraph/storage/triples/neo4j/write.py` + +**Implementation Pattern (all services):** + +1. **Register config handler in `__init__`:** +```python +# Add after AsyncProcessor initialization +self.register_config_handler(self.on_collection_config) +self.known_collections = set() # Track (user, collection) tuples +``` + +2. **Implement config handler:** +```python +async def on_collection_config(self, config, version): + """Handle collection configuration updates""" + logger.info(f"Collection config version: {version}") + + if "collections" not in config: + return + + # Parse collections from config + # Key format: "user:collection" in config["collections"] + config_collections = set() + for key in config["collections"].keys(): + if ":" in key: + user, collection = key.split(":", 1) + config_collections.add((user, collection)) + + # Determine changes + to_create = config_collections - self.known_collections + to_delete = self.known_collections - config_collections + + # Create new collections (idempotent) + for user, collection in to_create: + try: + await self.create_collection_internal(user, collection) + self.known_collections.add((user, collection)) + logger.info(f"Created collection: {user}/{collection}") + except Exception as e: + logger.error(f"Failed to create {user}/{collection}: {e}") + + # Delete removed collections (idempotent) + for user, collection in to_delete: + try: + await self.delete_collection_internal(user, collection) + self.known_collections.discard((user, collection)) + logger.info(f"Deleted collection: {user}/{collection}") + except Exception as e: + logger.error(f"Failed to delete {user}/{collection}: {e}") +``` + +3. **Initialize known collections on startup:** +```python +async def start(self): + """Start the processor""" + await super().start() + await self.sync_known_collections() + +async def sync_known_collections(self): + """Query backend to populate known_collections set""" + # Backend-specific implementation: + # - Milvus/Pinecone/Qdrant: List collections/indexes matching naming pattern + # - Cassandra: Query keyspaces or collection metadata + # - Neo4j/Memgraph/FalkorDB: Query CollectionMetadata nodes + pass +``` + +4. **Refactor existing handler methods:** +```python +# Rename and remove response sending: +# handle_create_collection β†’ create_collection_internal +# handle_delete_collection β†’ delete_collection_internal + +async def create_collection_internal(self, user, collection): + """Create collection (idempotent)""" + # Same logic as current handle_create_collection + # But remove response producer calls + # Handle "already exists" gracefully + pass + +async def delete_collection_internal(self, user, collection): + """Delete collection (idempotent)""" + # Same logic as current handle_delete_collection + # But remove response producer calls + # Handle "not found" gracefully + pass +``` + +5. **Remove storage management infrastructure:** + - Remove `self.storage_request_consumer` setup and start + - Remove `self.storage_response_producer` setup + - Remove `on_storage_management` dispatcher method + - Remove metrics for storage management + - Remove imports: `StorageManagementRequest`, `StorageManagementResponse` + +**Backend-Specific Considerations:** + +- **Vector stores (Milvus, Pinecone, Qdrant):** Track logical `(user, collection)` in `known_collections`, but may create multiple backend collections per dimension. Continue lazy creation pattern. Delete operations must remove all dimension variants. + +- **Cassandra Objects:** Collections are row properties, not structures. Track keyspace-level information. + +- **Graph stores (Neo4j, Memgraph, FalkorDB):** Query `CollectionMetadata` nodes on startup. Create/delete metadata nodes on sync. + +- **Cassandra Triples:** Use `KnowledgeGraph` API for collection operations. + +**Key Design Points:** + +- **Eventual consistency:** No request/response mechanism, config push is broadcast +- **Idempotency:** All create/delete operations must be safe to retry +- **Error handling:** Log errors but don't block config updates +- **Self-healing:** Failed operations will retry on next config push +- **Collection key format:** `"user:collection"` in `config["collections"]` + +#### Change 11: Update Collection Schema - Remove Timestamps +**File:** `trustgraph-base/trustgraph/schema/services/collection.py` + +**Modify CollectionMetadata (Lines 13-21):** +Remove `created_at` and `updated_at` fields: +```python +class CollectionMetadata(Record): + user = String() + collection = String() + name = String() + description = String() + tags = Array(String()) + # Remove: created_at = String() + # Remove: updated_at = String() +``` + +**Modify CollectionManagementRequest (Lines 25-47):** +Remove timestamp fields: +```python +class CollectionManagementRequest(Record): + operation = String() + user = String() + collection = String() + timestamp = String() + name = String() + description = String() + tags = Array(String()) + # Remove: created_at = String() + # Remove: updated_at = String() + tag_filter = Array(String()) + limit = Integer() +``` + +**Rationale:** +- Timestamps don't add value for collections +- Config service maintains its own version tracking +- Simplifies schema and reduces storage + +#### Benefits of Config Service Migration + +1. βœ… **Eliminates hardcoded storage management topics** - Solves multi-tenant blocker +2. βœ… **Simpler coordination** - No complex async waiting for 4+ storage responses +3. βœ… **Eventual consistency** - Storage services update independently via config push +4. βœ… **Better reliability** - Persistent config push vs non-persistent request/response +5. βœ… **Unified configuration model** - Collections treated as configuration +6. βœ… **Reduces complexity** - Removes ~300 lines of coordination code +7. βœ… **Multi-tenant ready** - Config already supports tenant isolation via keyspace +8. βœ… **Version tracking** - Config service version mechanism provides audit trail + +## Implementation Notes + +### Backward Compatibility + +**Parameter Changes:** +- CLI parameter renames are breaking changes but acceptable (feature currently non-functional) +- Services work without parameters (use defaults) +- Default keyspaces preserved: "config", "knowledge", "librarian" +- Default queue: `persistent://tg/config/config` + +**Collection Management:** +- **Breaking change:** Collections table removed from librarian keyspace +- **No data migration provided** - acceptable for this phase +- External collection API unchanged (list/update/delete operations) +- Collection metadata format simplified (timestamps removed) + +### Testing Requirements + +**Parameter Testing:** +1. Verify `--config-push-queue` parameter works on graph-embeddings service +2. Verify `--config-push-queue` parameter works on text-completion service +3. Verify `--config-push-queue` parameter works on config service +4. Verify `--cassandra-keyspace` parameter works for config service +5. Verify `--cassandra-keyspace` parameter works for cores service +6. Verify `--cassandra-keyspace` parameter works for librarian service +7. Verify services work without parameters (uses defaults) +8. Verify multi-tenant deployment with custom queue names and keyspace + +**Collection Management Testing:** +9. Verify `list-collections` operation via config service +10. Verify `update-collection` creates/updates in config table +11. Verify `delete-collection` removes from config table +12. Verify config push is triggered on collection updates +13. Verify tag filtering works with config-based storage +14. Verify collection operations work without timestamp fields + +### Multi-Tenant Deployment Example +```bash +# Tenant: tg-dev +graph-embeddings \ + -p pulsar+ssl://broker:6651 \ + --pulsar-api-key \ + --config-push-queue persistent://tg-dev/config/config + +config-service \ + -p pulsar+ssl://broker:6651 \ + --pulsar-api-key \ + --config-push-queue persistent://tg-dev/config/config \ + --cassandra-keyspace tg_dev_config +``` + +## Impact Analysis + +### Services Affected by Change 1-2 (CLI Parameter Rename) +All services inheriting from AsyncProcessor or FlowProcessor: +- config-service +- cores-service +- librarian-service +- graph-embeddings +- document-embeddings +- text-completion-* (all providers) +- extract-* (all extractors) +- query-* (all query services) +- retrieval-* (all RAG services) +- storage-* (all storage services) +- And 20+ more services + +### Services Affected by Changes 3-6 (Cassandra Keyspace) +- config-service +- cores-service +- librarian-service + +### Services Affected by Changes 7-11 (Collection Management) + +**Immediate Changes:** +- librarian-service (collection_manager.py, service.py) +- tables/library.py (collections table removal) +- schema/services/collection.py (timestamp removal) + +**Deferred Changes (Change 10):** +- All storage services (11 total) - will subscribe to config push for collection updates +- Storage management schema (potentially removable if unused elsewhere) + +## Future Considerations + +### Per-User Keyspace Model + +Some services use **per-user keyspaces** dynamically, where each user gets their own Cassandra keyspace: + +**Services with per-user keyspaces:** +1. **Triples Query Service** (`trustgraph-flow/trustgraph/query/triples/cassandra/service.py:65`) + - Uses `keyspace=query.user` +2. **Objects Query Service** (`trustgraph-flow/trustgraph/query/objects/cassandra/service.py:479`) + - Uses `keyspace=self.sanitize_name(user)` +3. **KnowledgeGraph Direct Access** (`trustgraph-flow/trustgraph/direct/cassandra_kg.py:18`) + - Default parameter `keyspace="trustgraph"` + +**Status:** These are **not modified** in this specification. + +**Future Review Required:** +- Evaluate whether per-user keyspace model creates tenant isolation issues +- Consider if multi-tenant deployments need keyspace prefix patterns (e.g., `tenant_a_user1`) +- Review for potential user ID collision across tenants +- Assess if single shared keyspace per tenant with user-based row isolation is preferable + +**Note:** This does not block the current multi-tenant implementation but should be reviewed before production multi-tenant deployments. + +## Implementation Phases + +### Phase 1: Parameter Fixes (Changes 1-6) +- Fix `--config-push-queue` parameter naming +- Add `--cassandra-keyspace` parameter support +- **Outcome:** Multi-tenant queue and keyspace configuration enabled + +### Phase 2: Collection Management Migration (Changes 7-9, 11) +- Migrate collection storage to config service +- Remove collections table from librarian +- Update collection schema (remove timestamps) +- **Outcome:** Eliminates hardcoded storage management topics, simplifies librarian + +### Phase 3: Storage Service Updates (Change 10) - Deferred +- Update all storage services to use config push for collections +- Remove storage management request/response infrastructure +- **Outcome:** Complete config-based collection management + +## References +- GitHub Issue: https://github.com/trustgraph-ai/trustgraph/issues/582 +- Related Files: + - `trustgraph-base/trustgraph/base/async_processor.py` + - `trustgraph-base/trustgraph/base/cassandra_config.py` + - `trustgraph-base/trustgraph/schema/core/topic.py` + - `trustgraph-base/trustgraph/schema/services/collection.py` + - `trustgraph-flow/trustgraph/config/service/service.py` + - `trustgraph-flow/trustgraph/cores/service.py` + - `trustgraph-flow/trustgraph/librarian/service.py` + - `trustgraph-flow/trustgraph/librarian/collection_manager.py` + - `trustgraph-flow/trustgraph/tables/library.py` diff --git a/tests/integration/test_cassandra_config_end_to_end.py b/tests/integration/test_cassandra_config_end_to_end.py index e706b76a..a06ec509 100644 --- a/tests/integration/test_cassandra_config_end_to_end.py +++ b/tests/integration/test_cassandra_config_end_to_end.py @@ -373,13 +373,13 @@ class TestMultipleHostsHandling: from trustgraph.base.cassandra_config import resolve_cassandra_config # Test various whitespace scenarios - hosts1, _, _ = resolve_cassandra_config(host='host1, host2 , host3') + hosts1, _, _, _ = resolve_cassandra_config(host='host1, host2 , host3') assert hosts1 == ['host1', 'host2', 'host3'] - hosts2, _, _ = resolve_cassandra_config(host='host1,host2,host3,') + hosts2, _, _, _ = resolve_cassandra_config(host='host1,host2,host3,') assert hosts2 == ['host1', 'host2', 'host3'] - hosts3, _, _ = resolve_cassandra_config(host=' host1 , host2 ') + hosts3, _, _, _ = resolve_cassandra_config(host=' host1 , host2 ') assert hosts3 == ['host1', 'host2'] diff --git a/tests/unit/test_base/test_cassandra_config.py b/tests/unit/test_base/test_cassandra_config.py index 547ff637..5703c7e1 100644 --- a/tests/unit/test_base/test_cassandra_config.py +++ b/tests/unit/test_base/test_cassandra_config.py @@ -145,7 +145,7 @@ class TestResolveCassandraConfig: def test_default_configuration(self): """Test resolution with no parameters or environment variables.""" with patch.dict(os.environ, {}, clear=True): - hosts, username, password = resolve_cassandra_config() + hosts, username, password, keyspace = resolve_cassandra_config() assert hosts == ['cassandra'] assert username is None @@ -160,7 +160,7 @@ class TestResolveCassandraConfig: } with patch.dict(os.environ, env_vars, clear=True): - hosts, username, password = resolve_cassandra_config() + hosts, username, password, keyspace = resolve_cassandra_config() assert hosts == ['env1', 'env2', 'env3'] assert username == 'env-user' @@ -175,7 +175,7 @@ class TestResolveCassandraConfig: } with patch.dict(os.environ, env_vars, clear=True): - hosts, username, password = resolve_cassandra_config( + hosts, username, password, keyspace = resolve_cassandra_config( host='explicit-host', username='explicit-user', password='explicit-pass' @@ -188,19 +188,19 @@ class TestResolveCassandraConfig: def test_host_list_parsing(self): """Test different host list formats.""" # Single host - hosts, _, _ = resolve_cassandra_config(host='single-host') + hosts, _, _, _ = resolve_cassandra_config(host='single-host') assert hosts == ['single-host'] # Multiple hosts with spaces - hosts, _, _ = resolve_cassandra_config(host='host1, host2 ,host3') + hosts, _, _, _ = resolve_cassandra_config(host='host1, host2 ,host3') assert hosts == ['host1', 'host2', 'host3'] # Empty elements filtered out - hosts, _, _ = resolve_cassandra_config(host='host1,,host2,') + hosts, _, _, _ = resolve_cassandra_config(host='host1,,host2,') assert hosts == ['host1', 'host2'] # Already a list - hosts, _, _ = resolve_cassandra_config(host=['list-host1', 'list-host2']) + hosts, _, _, _ = resolve_cassandra_config(host=['list-host1', 'list-host2']) assert hosts == ['list-host1', 'list-host2'] def test_args_object_resolution(self): @@ -212,7 +212,7 @@ class TestResolveCassandraConfig: cassandra_password = 'args-pass' args = MockArgs() - hosts, username, password = resolve_cassandra_config(args) + hosts, username, password, keyspace = resolve_cassandra_config(args) assert hosts == ['args-host1', 'args-host2'] assert username == 'args-user' @@ -233,7 +233,7 @@ class TestResolveCassandraConfig: with patch.dict(os.environ, env_vars, clear=True): args = PartialArgs() - hosts, username, password = resolve_cassandra_config(args) + hosts, username, password, keyspace = resolve_cassandra_config(args) assert hosts == ['args-host'] # From args assert username == 'env-user' # From env @@ -251,7 +251,7 @@ class TestGetCassandraConfigFromParams: 'cassandra_password': 'new-pass' } - hosts, username, password = get_cassandra_config_from_params(params) + hosts, username, password, keyspace = get_cassandra_config_from_params(params) assert hosts == ['new-host1', 'new-host2'] assert username == 'new-user' @@ -265,7 +265,7 @@ class TestGetCassandraConfigFromParams: 'graph_password': 'old-pass' } - hosts, username, password = get_cassandra_config_from_params(params) + hosts, username, password, keyspace = get_cassandra_config_from_params(params) # Should use defaults since graph_* params are not recognized assert hosts == ['cassandra'] # Default @@ -280,7 +280,7 @@ class TestGetCassandraConfigFromParams: 'cassandra_password': 'compat-pass' } - hosts, username, password = get_cassandra_config_from_params(params) + hosts, username, password, keyspace = get_cassandra_config_from_params(params) assert hosts == ['compat-host'] assert username is None # cassandra_user is not recognized @@ -298,7 +298,7 @@ class TestGetCassandraConfigFromParams: 'graph_password': 'old-pass' } - hosts, username, password = get_cassandra_config_from_params(params) + hosts, username, password, keyspace = get_cassandra_config_from_params(params) assert hosts == ['new-host'] # Only cassandra_* params work assert username == 'new-user' # Only cassandra_* params work @@ -314,7 +314,7 @@ class TestGetCassandraConfigFromParams: with patch.dict(os.environ, env_vars, clear=True): params = {} - hosts, username, password = get_cassandra_config_from_params(params) + hosts, username, password, keyspace = get_cassandra_config_from_params(params) assert hosts == ['fallback-host1', 'fallback-host2'] assert username == 'fallback-user' @@ -334,7 +334,7 @@ class TestConfigurationPriority: with patch.dict(os.environ, env_vars, clear=True): # CLI args should override everything - hosts, username, password = resolve_cassandra_config( + hosts, username, password, keyspace = resolve_cassandra_config( host='cli-host', username='cli-user', password='cli-pass' @@ -354,7 +354,7 @@ class TestConfigurationPriority: with patch.dict(os.environ, env_vars, clear=True): # Only provide host via CLI - hosts, username, password = resolve_cassandra_config( + hosts, username, password, keyspace = resolve_cassandra_config( host='cli-host' # username and password not provided ) @@ -366,7 +366,7 @@ class TestConfigurationPriority: def test_no_config_defaults(self): """Test that defaults are used when no configuration is provided.""" with patch.dict(os.environ, {}, clear=True): - hosts, username, password = resolve_cassandra_config() + hosts, username, password, keyspace = resolve_cassandra_config() assert hosts == ['cassandra'] # Default assert username is None # Default @@ -378,17 +378,17 @@ class TestEdgeCases: def test_empty_host_string(self): """Test handling of empty host string falls back to default.""" - hosts, _, _ = resolve_cassandra_config(host='') + hosts, _, _, _ = resolve_cassandra_config(host='') assert hosts == ['cassandra'] # Falls back to default def test_whitespace_only_host(self): """Test handling of whitespace-only host string.""" - hosts, _, _ = resolve_cassandra_config(host=' ') + hosts, _, _, _ = resolve_cassandra_config(host=' ') assert hosts == [] # Empty after stripping whitespace def test_none_values_preserved(self): """Test that None values are preserved correctly.""" - hosts, username, password = resolve_cassandra_config( + hosts, username, password, keyspace = resolve_cassandra_config( host=None, username=None, password=None @@ -401,7 +401,7 @@ class TestEdgeCases: def test_mixed_none_and_values(self): """Test mixing None and actual values.""" - hosts, username, password = resolve_cassandra_config( + hosts, username, password, keyspace = resolve_cassandra_config( host='mixed-host', username=None, password='mixed-pass' diff --git a/tests/unit/test_storage/test_doc_embeddings_qdrant_storage.py b/tests/unit/test_storage/test_doc_embeddings_qdrant_storage.py index f99d9883..3698526a 100644 --- a/tests/unit/test_storage/test_doc_embeddings_qdrant_storage.py +++ b/tests/unit/test_storage/test_doc_embeddings_qdrant_storage.py @@ -15,11 +15,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): """Test Qdrant document embeddings storage functionality""" @patch('trustgraph.storage.doc_embeddings.qdrant.write.QdrantClient') - @patch('trustgraph.base.DocumentEmbeddingsStoreService.__init__') - async def test_processor_initialization_basic(self, mock_base_init, mock_qdrant_client): + async def test_processor_initialization_basic(self, mock_qdrant_client): """Test basic Qdrant processor initialization""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_client.return_value = mock_qdrant_instance @@ -34,9 +32,6 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): processor = Processor(**config) # Assert - # Verify base class initialization was called - mock_base_init.assert_called_once() - # Verify QdrantClient was created with correct parameters mock_qdrant_client.assert_called_once_with(url='http://localhost:6333', api_key='test-api-key') @@ -45,11 +40,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): assert processor.qdrant == mock_qdrant_instance @patch('trustgraph.storage.doc_embeddings.qdrant.write.QdrantClient') - @patch('trustgraph.base.DocumentEmbeddingsStoreService.__init__') - async def test_processor_initialization_with_defaults(self, mock_base_init, mock_qdrant_client): + async def test_processor_initialization_with_defaults(self, mock_qdrant_client): """Test processor initialization with default values""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_client.return_value = mock_qdrant_instance @@ -68,11 +61,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): @patch('trustgraph.storage.doc_embeddings.qdrant.write.QdrantClient') @patch('trustgraph.storage.doc_embeddings.qdrant.write.uuid') - @patch('trustgraph.base.DocumentEmbeddingsStoreService.__init__') - async def test_store_document_embeddings_basic(self, mock_base_init, mock_uuid, mock_qdrant_client): + async def test_store_document_embeddings_basic(self, mock_uuid, mock_qdrant_client): """Test storing document embeddings with basic message""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_instance.collection_exists.return_value = True # Collection already exists mock_qdrant_client.return_value = mock_qdrant_instance @@ -121,11 +112,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): @patch('trustgraph.storage.doc_embeddings.qdrant.write.QdrantClient') @patch('trustgraph.storage.doc_embeddings.qdrant.write.uuid') - @patch('trustgraph.base.DocumentEmbeddingsStoreService.__init__') - async def test_store_document_embeddings_multiple_chunks(self, mock_base_init, mock_uuid, mock_qdrant_client): + async def test_store_document_embeddings_multiple_chunks(self, mock_uuid, mock_qdrant_client): """Test storing document embeddings with multiple chunks""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_instance.collection_exists.return_value = True mock_qdrant_client.return_value = mock_qdrant_instance @@ -180,11 +169,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): @patch('trustgraph.storage.doc_embeddings.qdrant.write.QdrantClient') @patch('trustgraph.storage.doc_embeddings.qdrant.write.uuid') - @patch('trustgraph.base.DocumentEmbeddingsStoreService.__init__') - async def test_store_document_embeddings_multiple_vectors_per_chunk(self, mock_base_init, mock_uuid, mock_qdrant_client): + async def test_store_document_embeddings_multiple_vectors_per_chunk(self, mock_uuid, mock_qdrant_client): """Test storing document embeddings with multiple vectors per chunk""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_instance.collection_exists.return_value = True mock_qdrant_client.return_value = mock_qdrant_instance @@ -237,11 +224,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): assert point.payload['doc'] == 'multi-vector document chunk' @patch('trustgraph.storage.doc_embeddings.qdrant.write.QdrantClient') - @patch('trustgraph.base.DocumentEmbeddingsStoreService.__init__') - async def test_store_document_embeddings_empty_chunk(self, mock_base_init, mock_qdrant_client): + async def test_store_document_embeddings_empty_chunk(self, mock_qdrant_client): """Test storing document embeddings skips empty chunks""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_instance.collection_exists.return_value = True # Collection exists mock_qdrant_client.return_value = mock_qdrant_instance @@ -277,11 +262,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): @patch('trustgraph.storage.doc_embeddings.qdrant.write.QdrantClient') @patch('trustgraph.storage.doc_embeddings.qdrant.write.uuid') - @patch('trustgraph.base.DocumentEmbeddingsStoreService.__init__') - async def test_collection_creation_when_not_exists(self, mock_base_init, mock_uuid, mock_qdrant_client): + async def test_collection_creation_when_not_exists(self, mock_uuid, mock_qdrant_client): """Test that writing to non-existent collection creates it lazily""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_instance.collection_exists.return_value = False # Collection doesn't exist mock_qdrant_client.return_value = mock_qdrant_instance @@ -326,11 +309,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): @patch('trustgraph.storage.doc_embeddings.qdrant.write.QdrantClient') @patch('trustgraph.storage.doc_embeddings.qdrant.write.uuid') - @patch('trustgraph.base.DocumentEmbeddingsStoreService.__init__') - async def test_collection_creation_exception(self, mock_base_init, mock_uuid, mock_qdrant_client): + async def test_collection_creation_exception(self, mock_uuid, mock_qdrant_client): """Test that collection creation errors are propagated""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_instance.collection_exists.return_value = False # Collection doesn't exist # Simulate creation failure @@ -364,12 +345,10 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): await processor.store_document_embeddings(mock_message) @patch('trustgraph.storage.doc_embeddings.qdrant.write.QdrantClient') - @patch('trustgraph.base.DocumentEmbeddingsStoreService.__init__') @patch('trustgraph.storage.doc_embeddings.qdrant.write.uuid') - async def test_collection_validation_on_write(self, mock_uuid, mock_base_init, mock_qdrant_client): + async def test_collection_validation_on_write(self, mock_uuid, mock_qdrant_client): """Test collection validation checks collection exists before writing""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_instance.collection_exists.return_value = True mock_qdrant_client.return_value = mock_qdrant_instance @@ -428,11 +407,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): @patch('trustgraph.storage.doc_embeddings.qdrant.write.QdrantClient') @patch('trustgraph.storage.doc_embeddings.qdrant.write.uuid') - @patch('trustgraph.base.DocumentEmbeddingsStoreService.__init__') - async def test_different_dimensions_different_collections(self, mock_base_init, mock_uuid, mock_qdrant_client): + async def test_different_dimensions_different_collections(self, mock_uuid, mock_qdrant_client): """Test that different vector dimensions create different collections""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_instance.collection_exists.return_value = True mock_qdrant_client.return_value = mock_qdrant_instance @@ -482,11 +459,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): assert upsert_calls[1][1]['collection_name'] == 'd_dim_user_dim_collection_3' @patch('trustgraph.storage.doc_embeddings.qdrant.write.QdrantClient') - @patch('trustgraph.base.DocumentEmbeddingsStoreService.__init__') - async def test_add_args_calls_parent(self, mock_base_init, mock_qdrant_client): + async def test_add_args_calls_parent(self, mock_qdrant_client): """Test that add_args() calls parent add_args method""" # Arrange - mock_base_init.return_value = None mock_qdrant_client.return_value = MagicMock() mock_parser = MagicMock() @@ -502,11 +477,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): @patch('trustgraph.storage.doc_embeddings.qdrant.write.QdrantClient') @patch('trustgraph.storage.doc_embeddings.qdrant.write.uuid') - @patch('trustgraph.base.DocumentEmbeddingsStoreService.__init__') - async def test_utf8_decoding_handling(self, mock_base_init, mock_uuid, mock_qdrant_client): + async def test_utf8_decoding_handling(self, mock_uuid, mock_qdrant_client): """Test proper UTF-8 decoding of chunk text""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_instance.collection_exists.return_value = True mock_qdrant_client.return_value = mock_qdrant_instance @@ -546,11 +519,9 @@ class TestQdrantDocEmbeddingsStorage(IsolatedAsyncioTestCase): assert point.payload['doc'] == 'UTF-8 text with special chars: cafΓ©, naΓ―ve, rΓ©sumΓ©' @patch('trustgraph.storage.doc_embeddings.qdrant.write.QdrantClient') - @patch('trustgraph.base.DocumentEmbeddingsStoreService.__init__') - async def test_chunk_decode_exception_handling(self, mock_base_init, mock_qdrant_client): + async def test_chunk_decode_exception_handling(self, mock_qdrant_client): """Test handling of chunk decode exceptions""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_client.return_value = mock_qdrant_instance diff --git a/tests/unit/test_storage/test_graph_embeddings_qdrant_storage.py b/tests/unit/test_storage/test_graph_embeddings_qdrant_storage.py index c4b603c9..c909a1fb 100644 --- a/tests/unit/test_storage/test_graph_embeddings_qdrant_storage.py +++ b/tests/unit/test_storage/test_graph_embeddings_qdrant_storage.py @@ -15,11 +15,9 @@ class TestQdrantGraphEmbeddingsStorage(IsolatedAsyncioTestCase): """Test Qdrant graph embeddings storage functionality""" @patch('trustgraph.storage.graph_embeddings.qdrant.write.QdrantClient') - @patch('trustgraph.base.GraphEmbeddingsStoreService.__init__') - async def test_processor_initialization_basic(self, mock_base_init, mock_qdrant_client): + async def test_processor_initialization_basic(self, mock_qdrant_client): """Test basic Qdrant processor initialization""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_client.return_value = mock_qdrant_instance @@ -34,9 +32,6 @@ class TestQdrantGraphEmbeddingsStorage(IsolatedAsyncioTestCase): processor = Processor(**config) # Assert - # Verify base class initialization was called - mock_base_init.assert_called_once() - # Verify QdrantClient was created with correct parameters mock_qdrant_client.assert_called_once_with(url='http://localhost:6333', api_key='test-api-key') @@ -46,11 +41,9 @@ class TestQdrantGraphEmbeddingsStorage(IsolatedAsyncioTestCase): @patch('trustgraph.storage.graph_embeddings.qdrant.write.QdrantClient') @patch('trustgraph.storage.graph_embeddings.qdrant.write.uuid') - @patch('trustgraph.base.GraphEmbeddingsStoreService.__init__') - async def test_store_graph_embeddings_basic(self, mock_base_init, mock_uuid, mock_qdrant_client): + async def test_store_graph_embeddings_basic(self, mock_uuid, mock_qdrant_client): """Test storing graph embeddings with basic message""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_instance.collection_exists.return_value = True # Collection already exists mock_qdrant_client.return_value = mock_qdrant_instance @@ -98,11 +91,9 @@ class TestQdrantGraphEmbeddingsStorage(IsolatedAsyncioTestCase): @patch('trustgraph.storage.graph_embeddings.qdrant.write.QdrantClient') @patch('trustgraph.storage.graph_embeddings.qdrant.write.uuid') - @patch('trustgraph.base.GraphEmbeddingsStoreService.__init__') - async def test_store_graph_embeddings_multiple_entities(self, mock_base_init, mock_uuid, mock_qdrant_client): + async def test_store_graph_embeddings_multiple_entities(self, mock_uuid, mock_qdrant_client): """Test storing graph embeddings with multiple entities""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_instance.collection_exists.return_value = True mock_qdrant_client.return_value = mock_qdrant_instance @@ -156,11 +147,9 @@ class TestQdrantGraphEmbeddingsStorage(IsolatedAsyncioTestCase): @patch('trustgraph.storage.graph_embeddings.qdrant.write.QdrantClient') @patch('trustgraph.storage.graph_embeddings.qdrant.write.uuid') - @patch('trustgraph.base.GraphEmbeddingsStoreService.__init__') - async def test_store_graph_embeddings_multiple_vectors_per_entity(self, mock_base_init, mock_uuid, mock_qdrant_client): + async def test_store_graph_embeddings_multiple_vectors_per_entity(self, mock_uuid, mock_qdrant_client): """Test storing graph embeddings with multiple vectors per entity""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_instance.collection_exists.return_value = True mock_qdrant_client.return_value = mock_qdrant_instance @@ -212,11 +201,9 @@ class TestQdrantGraphEmbeddingsStorage(IsolatedAsyncioTestCase): assert point.payload['entity'] == 'multi_vector_entity' @patch('trustgraph.storage.graph_embeddings.qdrant.write.QdrantClient') - @patch('trustgraph.base.GraphEmbeddingsStoreService.__init__') - async def test_store_graph_embeddings_empty_entity_value(self, mock_base_init, mock_qdrant_client): + async def test_store_graph_embeddings_empty_entity_value(self, mock_qdrant_client): """Test storing graph embeddings skips empty entity values""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_client.return_value = mock_qdrant_instance @@ -253,11 +240,9 @@ class TestQdrantGraphEmbeddingsStorage(IsolatedAsyncioTestCase): mock_qdrant_instance.collection_exists.assert_not_called() @patch('trustgraph.storage.graph_embeddings.qdrant.write.QdrantClient') - @patch('trustgraph.base.GraphEmbeddingsStoreService.__init__') - async def test_processor_initialization_with_defaults(self, mock_base_init, mock_qdrant_client): + async def test_processor_initialization_with_defaults(self, mock_qdrant_client): """Test processor initialization with default values""" # Arrange - mock_base_init.return_value = None mock_qdrant_instance = MagicMock() mock_qdrant_client.return_value = mock_qdrant_instance @@ -275,11 +260,9 @@ class TestQdrantGraphEmbeddingsStorage(IsolatedAsyncioTestCase): mock_qdrant_client.assert_called_once_with(url='http://localhost:6333', api_key=None) @patch('trustgraph.storage.graph_embeddings.qdrant.write.QdrantClient') - @patch('trustgraph.base.GraphEmbeddingsStoreService.__init__') - async def test_add_args_calls_parent(self, mock_base_init, mock_qdrant_client): + async def test_add_args_calls_parent(self, mock_qdrant_client): """Test that add_args() calls parent add_args method""" # Arrange - mock_base_init.return_value = None mock_qdrant_client.return_value = MagicMock() mock_parser = MagicMock() diff --git a/trustgraph-base/trustgraph/base/__init__.py b/trustgraph-base/trustgraph/base/__init__.py index b329f52e..2bb80c6c 100644 --- a/trustgraph-base/trustgraph/base/__init__.py +++ b/trustgraph-base/trustgraph/base/__init__.py @@ -33,4 +33,5 @@ from . tool_service import ToolService from . tool_client import ToolClientSpec from . agent_client import AgentClientSpec from . structured_query_client import StructuredQueryClientSpec +from . collection_config_handler import CollectionConfigHandler diff --git a/trustgraph-base/trustgraph/base/async_processor.py b/trustgraph-base/trustgraph/base/async_processor.py index e496da7c..6f0ced11 100644 --- a/trustgraph-base/trustgraph/base/async_processor.py +++ b/trustgraph-base/trustgraph/base/async_processor.py @@ -258,9 +258,9 @@ class AsyncProcessor: PulsarClient.add_args(parser) parser.add_argument( - '--config-queue', + '--config-push-queue', default=default_config_queue, - help=f'Config push queue {default_config_queue}', + help=f'Config push queue (default: {default_config_queue})', ) parser.add_argument( diff --git a/trustgraph-base/trustgraph/base/cassandra_config.py b/trustgraph-base/trustgraph/base/cassandra_config.py index 46a1745d..bacc4313 100644 --- a/trustgraph-base/trustgraph/base/cassandra_config.py +++ b/trustgraph-base/trustgraph/base/cassandra_config.py @@ -13,14 +13,15 @@ from typing import Optional, Tuple, List, Any def get_cassandra_defaults() -> dict: """ Get default Cassandra configuration values from environment variables or fallback defaults. - + Returns: - dict: Dictionary with 'host', 'username', and 'password' keys + dict: Dictionary with 'host', 'username', 'password', and 'keyspace' keys """ return { 'host': os.getenv('CASSANDRA_HOST', 'cassandra'), 'username': os.getenv('CASSANDRA_USERNAME'), - 'password': os.getenv('CASSANDRA_PASSWORD') + 'password': os.getenv('CASSANDRA_PASSWORD'), + 'keyspace': os.getenv('CASSANDRA_KEYSPACE') } @@ -53,82 +54,108 @@ def add_cassandra_args(parser: argparse.ArgumentParser) -> None: password_help += " (default: )" if 'CASSANDRA_PASSWORD' in os.environ: password_help += " [from CASSANDRA_PASSWORD]" - + + keyspace_help = "Cassandra keyspace (default: service-specific)" + if defaults['keyspace']: + keyspace_help = f"Cassandra keyspace (default: {defaults['keyspace']})" + if 'CASSANDRA_KEYSPACE' in os.environ: + keyspace_help += " [from CASSANDRA_KEYSPACE]" + parser.add_argument( '--cassandra-host', default=defaults['host'], help=host_help ) - + parser.add_argument( '--cassandra-username', default=defaults['username'], help=username_help ) - + parser.add_argument( '--cassandra-password', default=defaults['password'], help=password_help ) + parser.add_argument( + '--cassandra-keyspace', + default=defaults['keyspace'], + help=keyspace_help + ) + def resolve_cassandra_config( args: Optional[Any] = None, host: Optional[str] = None, username: Optional[str] = None, - password: Optional[str] = None -) -> Tuple[List[str], Optional[str], Optional[str]]: + password: Optional[str] = None, + default_keyspace: Optional[str] = None +) -> Tuple[List[str], Optional[str], Optional[str], Optional[str]]: """ Resolve Cassandra configuration from various sources. - + Can accept either argparse args object or explicit parameters. Converts host string to list format for Cassandra driver. - + Args: - args: Optional argparse namespace with cassandra_host, cassandra_username, cassandra_password + args: Optional argparse namespace with cassandra_host, cassandra_username, cassandra_password, cassandra_keyspace host: Optional explicit host parameter (overrides args) username: Optional explicit username parameter (overrides args) password: Optional explicit password parameter (overrides args) - + default_keyspace: Optional default keyspace if not specified elsewhere + Returns: - tuple: (hosts_list, username, password) + tuple: (hosts_list, username, password, keyspace) """ # If args provided, extract values + keyspace = None if args is not None: host = host or getattr(args, 'cassandra_host', None) username = username or getattr(args, 'cassandra_username', None) password = password or getattr(args, 'cassandra_password', None) - + keyspace = getattr(args, 'cassandra_keyspace', None) + # Apply defaults if still None defaults = get_cassandra_defaults() host = host or defaults['host'] username = username or defaults['username'] password = password or defaults['password'] - + keyspace = keyspace or defaults['keyspace'] or default_keyspace + # Convert host string to list if isinstance(host, str): hosts = [h.strip() for h in host.split(',') if h.strip()] else: hosts = host - - return hosts, username, password + + return hosts, username, password, keyspace -def get_cassandra_config_from_params(params: dict) -> Tuple[List[str], Optional[str], Optional[str]]: +def get_cassandra_config_from_params( + params: dict, + default_keyspace: Optional[str] = None +) -> Tuple[List[str], Optional[str], Optional[str], Optional[str]]: """ Extract and resolve Cassandra configuration from a parameters dictionary. - + Args: params: Dictionary of parameters that may contain Cassandra configuration - + default_keyspace: Optional default keyspace if not specified in params + Returns: - tuple: (hosts_list, username, password) + tuple: (hosts_list, username, password, keyspace) """ # Get Cassandra parameters host = params.get('cassandra_host') username = params.get('cassandra_username') password = params.get('cassandra_password') - + # Use resolve function to handle defaults and list conversion - return resolve_cassandra_config(host=host, username=username, password=password) \ No newline at end of file + return resolve_cassandra_config( + host=host, + username=username, + password=password, + default_keyspace=default_keyspace + ) \ No newline at end of file diff --git a/trustgraph-base/trustgraph/base/collection_config_handler.py b/trustgraph-base/trustgraph/base/collection_config_handler.py new file mode 100644 index 00000000..2d752b3c --- /dev/null +++ b/trustgraph-base/trustgraph/base/collection_config_handler.py @@ -0,0 +1,127 @@ +""" +Handler for storage services to process collection configuration from config push +""" + +import json +import logging +from typing import Dict, Set + +logger = logging.getLogger(__name__) + +class CollectionConfigHandler: + """ + Handles collection configuration from config push messages for storage services. + + Storage services should: + 1. Inherit from this class along with their service base class + 2. Call register_config_handler(self.on_collection_config) in __init__ + 3. Implement create_collection(user, collection, metadata) method + 4. Implement delete_collection(user, collection) method + """ + + def __init__(self, **kwargs): + # Track known collections: {(user, collection): metadata_dict} + self.known_collections: Dict[tuple, dict] = {} + # Pass remaining kwargs up the inheritance chain + super().__init__(**kwargs) + + async def on_collection_config(self, config: dict, version: int): + """ + Handle config push messages and extract collection information + + Args: + config: Configuration dictionary from ConfigPush message + version: Configuration version number + """ + logger.info(f"Processing collection configuration (version {version})") + + # Extract collections from config + if "collection" not in config: + logger.debug("No collection configuration in config push") + return + + collection_config = config["collection"] + + # Track which collections we've seen in this config + current_collections: Set[tuple] = set() + + # Process each collection in the config + for key, value_json in collection_config.items(): + try: + # Parse user:collection key + if ":" not in key: + logger.warning(f"Invalid collection key format (expected user:collection): {key}") + continue + + user, collection = key.split(":", 1) + current_collections.add((user, collection)) + + # Parse metadata + metadata = json.loads(value_json) + + # Check if this is a new collection or updated + collection_key = (user, collection) + if collection_key not in self.known_collections: + logger.info(f"New collection detected: {user}/{collection}") + await self.create_collection(user, collection, metadata) + self.known_collections[collection_key] = metadata + else: + # Collection already exists, update metadata if changed + if self.known_collections[collection_key] != metadata: + logger.info(f"Collection metadata updated: {user}/{collection}") + # Most storage services don't need to do anything for metadata updates + # They just need to know the collection exists + self.known_collections[collection_key] = metadata + + except Exception as e: + logger.error(f"Error processing collection config for key {key}: {e}", exc_info=True) + + # Find collections that were deleted (in known but not in current) + deleted_collections = set(self.known_collections.keys()) - current_collections + for user, collection in deleted_collections: + logger.info(f"Collection deleted: {user}/{collection}") + try: + await self.delete_collection(user, collection) + del self.known_collections[(user, collection)] + except Exception as e: + logger.error(f"Error deleting collection {user}/{collection}: {e}", exc_info=True) + + logger.debug(f"Collection config processing complete. Known collections: {len(self.known_collections)}") + + async def create_collection(self, user: str, collection: str, metadata: dict): + """ + Create a collection in the storage backend. + + Subclasses must implement this method. + + Args: + user: User ID + collection: Collection ID + metadata: Collection metadata dictionary + """ + raise NotImplementedError("Storage service must implement create_collection method") + + async def delete_collection(self, user: str, collection: str): + """ + Delete a collection from the storage backend. + + Subclasses must implement this method. + + Args: + user: User ID + collection: Collection ID + """ + raise NotImplementedError("Storage service must implement delete_collection method") + + def collection_exists(self, user: str, collection: str) -> bool: + """ + Check if a collection is known to exist + + Args: + user: User ID + collection: Collection ID + + Returns: + True if collection exists, False otherwise + """ + return (user, collection) in self.known_collections diff --git a/trustgraph-base/trustgraph/schema/services/collection.py b/trustgraph-base/trustgraph/schema/services/collection.py index 905b2056..04f644e8 100644 --- a/trustgraph-base/trustgraph/schema/services/collection.py +++ b/trustgraph-base/trustgraph/schema/services/collection.py @@ -17,8 +17,6 @@ class CollectionMetadata(Record): name = String() description = String() tags = Array(String()) - created_at = String() # ISO timestamp - updated_at = String() # ISO timestamp ############################################################################ @@ -33,8 +31,6 @@ class CollectionManagementRequest(Record): name = String() description = String() tags = Array(String()) - created_at = String() # ISO timestamp - updated_at = String() # ISO timestamp # For list tag_filter = Array(String()) # Optional filter by tags diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index 84ed2a6a..0a8ee8a1 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -26,9 +26,6 @@ from ... base import Consumer, Producer # Module logger logger = logging.getLogger(__name__) -# FIXME: How to ensure this doesn't conflict with other usage? -keyspace = "config" - default_ident = "config-svc" default_config_request_queue = config_request_queue @@ -64,12 +61,13 @@ class Processor(AsyncProcessor): cassandra_host = params.get("cassandra_host") cassandra_username = params.get("cassandra_username") cassandra_password = params.get("cassandra_password") - + # Resolve configuration with environment variable fallback - hosts, username, password = resolve_cassandra_config( + hosts, username, password, keyspace = resolve_cassandra_config( host=cassandra_host, username=cassandra_username, - password=cassandra_password + password=cassandra_password, + default_keyspace="config" ) # Store resolved configuration @@ -273,7 +271,7 @@ class Processor(AsyncProcessor): ) parser.add_argument( - '--push-queue', + '--config-push-queue', default=default_config_push_queue, help=f'Config push queue (default: {default_config_push_queue})' ) diff --git a/trustgraph-flow/trustgraph/cores/service.py b/trustgraph-flow/trustgraph/cores/service.py index 9cb0e1d0..4b1573a9 100755 --- a/trustgraph-flow/trustgraph/cores/service.py +++ b/trustgraph-flow/trustgraph/cores/service.py @@ -33,9 +33,6 @@ default_knowledge_response_queue = knowledge_response_queue default_cassandra_host = "cassandra" -# FIXME: How to ensure this doesn't conflict with other usage? -keyspace = "knowledge" - class Processor(AsyncProcessor): def __init__(self, **params): @@ -53,14 +50,15 @@ class Processor(AsyncProcessor): cassandra_host = params.get("cassandra_host") cassandra_username = params.get("cassandra_username") cassandra_password = params.get("cassandra_password") - + # Resolve configuration with environment variable fallback - hosts, username, password = resolve_cassandra_config( + hosts, username, password, keyspace = resolve_cassandra_config( host=cassandra_host, username=cassandra_username, - password=cassandra_password + password=cassandra_password, + default_keyspace="knowledge" ) - + # Store resolved configuration self.cassandra_host = hosts self.cassandra_username = username diff --git a/trustgraph-flow/trustgraph/librarian/collection_manager.py b/trustgraph-flow/trustgraph/librarian/collection_manager.py index 1530ed84..d3e1b369 100644 --- a/trustgraph-flow/trustgraph/librarian/collection_manager.py +++ b/trustgraph-flow/trustgraph/librarian/collection_manager.py @@ -1,142 +1,130 @@ """ -Collection management for the librarian +Collection management for the librarian - uses config service for storage """ import asyncio import logging +import json +import uuid from datetime import datetime from typing import Dict, Any, List, Optional from .. schema import CollectionManagementRequest, CollectionManagementResponse, Error from .. schema import CollectionMetadata -from .. schema import StorageManagementRequest, StorageManagementResponse +from .. schema import ConfigRequest, ConfigResponse from .. exceptions import RequestError -from .. tables.library import LibraryTableStore # Module logger logger = logging.getLogger(__name__) class CollectionManager: - """Manages collection metadata and coordinates collection operations across storage types""" + """Manages collection metadata via config service""" def __init__( self, - cassandra_host, - cassandra_username, - cassandra_password, - keyspace, - vector_storage_producer=None, - object_storage_producer=None, - triples_storage_producer=None, - storage_response_consumer=None + config_request_producer, + config_response_consumer, + taskgroup ): """ Initialize the CollectionManager Args: - cassandra_host: Cassandra host(s) - cassandra_username: Cassandra username - cassandra_password: Cassandra password - keyspace: Cassandra keyspace for library data - vector_storage_producer: Producer for vector storage management - object_storage_producer: Producer for object storage management - triples_storage_producer: Producer for triples storage management - storage_response_consumer: Consumer for storage management responses + config_request_producer: Producer for config service requests + config_response_consumer: Consumer for config service responses + taskgroup: Task group for async operations """ - self.table_store = LibraryTableStore( - cassandra_host, cassandra_username, cassandra_password, keyspace - ) + self.config_request_producer = config_request_producer + self.config_response_consumer = config_response_consumer + self.taskgroup = taskgroup - # Storage management producers - self.vector_storage_producer = vector_storage_producer - self.object_storage_producer = object_storage_producer - self.triples_storage_producer = triples_storage_producer - self.storage_response_consumer = storage_response_consumer + # Track pending config requests + self.pending_config_requests = {} - # Track pending deletion operations - self.pending_deletions = {} + logger.info("Collection manager initialized with config service backend") - logger.info("Collection manager initialized") + async def send_config_request(self, request: ConfigRequest) -> ConfigResponse: + """ + Send config request and wait for response + + Args: + request: Config service request + + Returns: + ConfigResponse from config service + """ + event = asyncio.Event() + self.pending_config_requests[request.id] = event + + await self.config_request_producer.send(request) + await event.wait() + + response = self.pending_config_requests.pop(request.id + "_response") + return response + + async def on_config_response(self, message, consumer, flow): + """ + Handle config response + + Args: + message: Pulsar message + consumer: Consumer instance + flow: Flow context + """ + response = message.value() + if response.id in self.pending_config_requests: + self.pending_config_requests[response.id + "_response"] = response + self.pending_config_requests[response.id].set() async def ensure_collection_exists(self, user: str, collection: str): """ - Ensure a collection exists, creating it if necessary with broadcast to storage + Ensure a collection exists, creating it if necessary Args: user: User ID collection: Collection ID """ try: - # Check if collection already exists - existing = await self.table_store.get_collection(user, collection) - if existing: + # Check if collection exists via config service + request = ConfigRequest( + id=str(uuid.uuid4()), + operation='get', + type='collection', + keys=[f'{user}:{collection}'] + ) + + response = await self.send_config_request(request) + + # If collection exists, we're done + if response.values and len(response.values) > 0: logger.debug(f"Collection {user}/{collection} already exists") return # Create new collection with default metadata - logger.info(f"Auto-creating collection {user}/{collection} from document submission") - await self.table_store.create_collection( + logger.info(f"Auto-creating collection {user}/{collection}") + + metadata = CollectionMetadata( user=user, collection=collection, name=collection, # Default name to collection ID description="", - tags=set() + tags=[] ) - # Broadcast collection creation to all storage backends - creation_key = (user, collection) - logger.info(f"Broadcasting create-collection for {creation_key}") - - self.pending_deletions[creation_key] = { - "responses_pending": 4, # doc-embeddings, graph-embeddings, object, triples - "responses_received": [], - "all_successful": True, - "error_messages": [], - "deletion_complete": asyncio.Event() - } - - storage_request = StorageManagementRequest( - operation="create-collection", - user=user, - collection=collection + request = ConfigRequest( + id=str(uuid.uuid4()), + operation='put', + type='collection', + key=f'{user}:{collection}', + value=json.dumps(metadata.to_dict()) ) - # Send creation requests to all storage types - if self.vector_storage_producer: - await self.vector_storage_producer.send(storage_request) - if self.object_storage_producer: - await self.object_storage_producer.send(storage_request) - if self.triples_storage_producer: - await self.triples_storage_producer.send(storage_request) + response = await self.send_config_request(request) - # Wait for all storage creations to complete (with timeout) - creation_info = self.pending_deletions[creation_key] - try: - await asyncio.wait_for( - creation_info["deletion_complete"].wait(), - timeout=30.0 # 30 second timeout - ) - except asyncio.TimeoutError: - logger.error(f"Timeout waiting for storage creation responses for {creation_key}") - creation_info["all_successful"] = False - creation_info["error_messages"].append("Timeout waiting for storage creation") + if response.error: + raise RuntimeError(f"Config update failed: {response.error.message}") - # Check if all creations succeeded - if not creation_info["all_successful"]: - error_msg = f"Storage creation failed: {'; '.join(creation_info['error_messages'])}" - logger.error(error_msg) - - # Clean up metadata on failure - await self.table_store.delete_collection(user, collection) - - # Clean up tracking - del self.pending_deletions[creation_key] - - raise RuntimeError(error_msg) - - # Clean up tracking - del self.pending_deletions[creation_key] - logger.info(f"Collection {creation_key} auto-created successfully in all storage backends") + logger.info(f"Collection {user}/{collection} auto-created in config service") except Exception as e: logger.error(f"Error ensuring collection exists: {e}") @@ -144,7 +132,7 @@ class CollectionManager: async def list_collections(self, request: CollectionManagementRequest) -> CollectionManagementResponse: """ - List collections for a user with optional tag filtering + List collections for a user from config service Args: request: Collection management request @@ -153,25 +141,43 @@ class CollectionManager: CollectionManagementResponse with list of collections """ try: - tag_filter = list(request.tag_filter) if request.tag_filter else None - collections = await self.table_store.list_collections(request.user, tag_filter) + # Get all collections from config service + config_request = ConfigRequest( + id=str(uuid.uuid4()), + operation='getvalues', + type='collection' + ) - collection_metadata = [ - CollectionMetadata( - user=coll["user"], - collection=coll["collection"], - name=coll["name"], - description=coll["description"], - tags=coll["tags"], - created_at=coll["created_at"], - updated_at=coll["updated_at"] - ) - for coll in collections - ] + response = await self.send_config_request(config_request) + + if response.error: + raise RuntimeError(f"Config query failed: {response.error.message}") + + # Parse collections and filter by user + collections = [] + for key, value_json in response.values.items(): + if ":" in key: + coll_user, coll_name = key.split(":", 1) + if coll_user == request.user: + metadata_dict = json.loads(value_json) + metadata = CollectionMetadata(**metadata_dict) + collections.append(metadata) + + # Apply tag filtering if specified + if request.tag_filter: + tag_filter_set = set(request.tag_filter) + collections = [ + c for c in collections + if any(tag in tag_filter_set for tag in c.tags) + ] + + # Apply limit if specified + if request.limit and request.limit > 0: + collections = collections[:request.limit] return CollectionManagementResponse( error=None, - collections=collection_metadata, + collections=collections, timestamp=datetime.now().isoformat() ) @@ -181,7 +187,7 @@ class CollectionManager: async def update_collection(self, request: CollectionManagementRequest) -> CollectionManagementResponse: """ - Update collection metadata (creates if doesn't exist) + Update collection metadata via config service (creates if doesn't exist) Args: request: Collection management request @@ -190,120 +196,41 @@ class CollectionManager: CollectionManagementResponse with updated collection """ try: - # Check if collection exists, create if it doesn't - existing = await self.table_store.get_collection(request.user, request.collection) - if not existing: - # Create new collection with provided metadata - logger.info(f"Creating new collection {request.user}/{request.collection}") + # Create metadata from request + name = request.name if request.name else request.collection + description = request.description if request.description else "" + tags = list(request.tags) if request.tags else [] - name = request.name if request.name else request.collection - description = request.description if request.description else "" - tags = set(request.tags) if request.tags else set() + metadata = CollectionMetadata( + user=request.user, + collection=request.collection, + name=name, + description=description, + tags=tags + ) - await self.table_store.create_collection( - user=request.user, - collection=request.collection, - name=name, - description=description, - tags=tags - ) + # Send put request to config service + config_request = ConfigRequest( + id=str(uuid.uuid4()), + operation='put', + type='collection', + key=f'{request.user}:{request.collection}', + value=json.dumps(metadata.to_dict()) + ) - # Broadcast collection creation to all storage backends - creation_key = (request.user, request.collection) - logger.info(f"Broadcasting create-collection for {creation_key}") + response = await self.send_config_request(config_request) - self.pending_deletions[creation_key] = { - "responses_pending": 4, # doc-embeddings, graph-embeddings, object, triples - "responses_received": [], - "all_successful": True, - "error_messages": [], - "deletion_complete": asyncio.Event() - } + if response.error: + raise RuntimeError(f"Config update failed: {response.error.message}") - storage_request = StorageManagementRequest( - operation="create-collection", - user=request.user, - collection=request.collection - ) + logger.info(f"Collection {request.user}/{request.collection} updated in config service") - # Send creation requests to all storage types - if self.vector_storage_producer: - await self.vector_storage_producer.send(storage_request) - if self.object_storage_producer: - await self.object_storage_producer.send(storage_request) - if self.triples_storage_producer: - await self.triples_storage_producer.send(storage_request) - - # Wait for all storage creations to complete (with timeout) - creation_info = self.pending_deletions[creation_key] - try: - await asyncio.wait_for( - creation_info["deletion_complete"].wait(), - timeout=30.0 # 30 second timeout - ) - except asyncio.TimeoutError: - logger.error(f"Timeout waiting for storage creation responses for {creation_key}") - creation_info["all_successful"] = False - creation_info["error_messages"].append("Timeout waiting for storage creation") - - # Check if all creations succeeded - if not creation_info["all_successful"]: - error_msg = f"Storage creation failed: {'; '.join(creation_info['error_messages'])}" - logger.error(error_msg) - - # Clean up metadata on failure - await self.table_store.delete_collection(request.user, request.collection) - - # Clean up tracking - del self.pending_deletions[creation_key] - - return CollectionManagementResponse( - error=Error( - type="storage_creation_error", - message=error_msg - ), - timestamp=datetime.now().isoformat() - ) - - # Clean up tracking - del self.pending_deletions[creation_key] - logger.info(f"Collection {creation_key} created successfully in all storage backends") - - # Get the newly created collection for response - created_collection = await self.table_store.get_collection(request.user, request.collection) - - collection_metadata = CollectionMetadata( - user=created_collection["user"], - collection=created_collection["collection"], - name=created_collection["name"], - description=created_collection["description"], - tags=created_collection["tags"], - created_at=created_collection["created_at"], - updated_at=created_collection["updated_at"] - ) - else: - # Collection exists, update it - name = request.name if request.name else None - description = request.description if request.description else None - tags = list(request.tags) if request.tags else None - - updated_collection = await self.table_store.update_collection( - request.user, request.collection, name, description, tags - ) - - collection_metadata = CollectionMetadata( - user=updated_collection["user"], - collection=updated_collection["collection"], - name=updated_collection["name"], - description=updated_collection["description"], - tags=updated_collection["tags"], - created_at="", # Not returned by update - updated_at=updated_collection["updated_at"] - ) + # Config service will trigger config push automatically + # Storage services will receive update and create/update collections return CollectionManagementResponse( error=None, - collections=[collection_metadata], + collections=[metadata], timestamp=datetime.now().isoformat() ) @@ -313,7 +240,7 @@ class CollectionManager: async def delete_collection(self, request: CollectionManagementRequest) -> CollectionManagementResponse: """ - Delete collection with cascade to all storage types + Delete collection via config service Args: request: Collection management request @@ -322,68 +249,25 @@ class CollectionManager: CollectionManagementResponse indicating success or failure """ try: - deletion_key = (request.user, request.collection) + logger.info(f"Deleting collection {request.user}/{request.collection}") - logger.info(f"Starting cascade deletion for {request.user}/{request.collection}") - - # Track this deletion request - self.pending_deletions[deletion_key] = { - "responses_pending": 4, # doc-embeddings, graph-embeddings, object, triples - "responses_received": [], - "all_successful": True, - "error_messages": [], - "deletion_complete": asyncio.Event() - } - - # Create storage management request - storage_request = StorageManagementRequest( - operation="delete-collection", - user=request.user, - collection=request.collection + # Send delete request to config service + config_request = ConfigRequest( + id=str(uuid.uuid4()), + operation='delete', + type='collection', + key=f'{request.user}:{request.collection}' ) - # Send deletion requests to all storage types - if self.vector_storage_producer: - await self.vector_storage_producer.send(storage_request) - if self.object_storage_producer: - await self.object_storage_producer.send(storage_request) - if self.triples_storage_producer: - await self.triples_storage_producer.send(storage_request) + response = await self.send_config_request(config_request) - # Wait for all storage deletions to complete (with timeout) - deletion_info = self.pending_deletions[deletion_key] - try: - await asyncio.wait_for( - deletion_info["deletion_complete"].wait(), - timeout=30.0 # 30 second timeout - ) - except asyncio.TimeoutError: - logger.error(f"Timeout waiting for storage deletion responses for {deletion_key}") - deletion_info["all_successful"] = False - deletion_info["error_messages"].append("Timeout waiting for storage deletion") + if response.error: + raise RuntimeError(f"Config delete failed: {response.error.message}") - # Check if all deletions succeeded - if not deletion_info["all_successful"]: - error_msg = f"Storage deletion failed: {'; '.join(deletion_info['error_messages'])}" - logger.error(error_msg) + logger.info(f"Collection {request.user}/{request.collection} deleted from config service") - # Clean up tracking - del self.pending_deletions[deletion_key] - - return CollectionManagementResponse( - error=Error( - type="storage_deletion_error", - message=error_msg - ), - timestamp=datetime.now().isoformat() - ) - - # All storage deletions succeeded, now delete metadata - logger.info(f"Storage deletions complete, removing metadata for {deletion_key}") - await self.table_store.delete_collection(request.user, request.collection) - - # Clean up tracking - del self.pending_deletions[deletion_key] + # Config service will trigger config push automatically + # Storage services will receive update and delete collections return CollectionManagementResponse( error=None, @@ -392,39 +276,4 @@ class CollectionManager: except Exception as e: logger.error(f"Error deleting collection: {e}") - # Clean up tracking on error - if deletion_key in self.pending_deletions: - del self.pending_deletions[deletion_key] raise RequestError(f"Failed to delete collection: {str(e)}") - - async def on_storage_response(self, response: StorageManagementResponse): - """ - Handle storage management responses for deletion tracking - - Args: - response: Storage management response - """ - logger.debug(f"Received storage response: error={response.error}") - - # Find matching deletion by checking all pending deletions - # Note: This is simplified correlation - in production we'd want better correlation - for deletion_key, info in list(self.pending_deletions.items()): - if info["responses_pending"] > 0: - # Record this response - info["responses_received"].append(response) - info["responses_pending"] -= 1 - - # Check if this response indicates failure - if response.error and response.error.message: - info["all_successful"] = False - info["error_messages"].append(response.error.message) - logger.warning(f"Storage operation failed for {deletion_key}: {response.error.message}") - else: - logger.debug(f"Storage operation succeeded for {deletion_key}") - - # If all responses received, signal completion - if info["responses_pending"] == 0: - logger.info(f"All storage responses received for {deletion_key}") - info["deletion_complete"].set() - - break # Only process for first matching deletion \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index 00d64010..e3ec6977 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -18,9 +18,8 @@ from .. schema import LibrarianRequest, LibrarianResponse, Error from .. schema import librarian_request_queue, librarian_response_queue from .. schema import CollectionManagementRequest, CollectionManagementResponse from .. schema import collection_request_queue, collection_response_queue -from .. schema import StorageManagementRequest, StorageManagementResponse -from .. schema import vector_storage_management_topic, object_storage_management_topic -from .. schema import triples_storage_management_topic, storage_management_response_topic +from .. schema import ConfigRequest, ConfigResponse +from .. schema import config_request_queue, config_response_queue from .. schema import Document, Metadata from .. schema import TextDocument, Metadata @@ -39,6 +38,8 @@ default_librarian_request_queue = librarian_request_queue default_librarian_response_queue = librarian_response_queue default_collection_request_queue = collection_request_queue default_collection_response_queue = collection_response_queue +default_config_request_queue = config_request_queue +default_config_response_queue = config_response_queue default_minio_host = "minio:9000" default_minio_access_key = "minioadmin" @@ -47,9 +48,6 @@ default_cassandra_host = "cassandra" bucket_name = "library" -# FIXME: How to ensure this doesn't conflict with other usage? -keyspace = "librarian" - class Processor(AsyncProcessor): def __init__(self, **params): @@ -74,6 +72,14 @@ class Processor(AsyncProcessor): "collection_response_queue", default_collection_response_queue ) + config_request_queue = params.get( + "config_request_queue", default_config_request_queue + ) + + config_response_queue = params.get( + "config_response_queue", default_config_response_queue + ) + minio_host = params.get("minio_host", default_minio_host) minio_access_key = params.get( "minio_access_key", @@ -87,14 +93,15 @@ class Processor(AsyncProcessor): cassandra_host = params.get("cassandra_host") cassandra_username = params.get("cassandra_username") cassandra_password = params.get("cassandra_password") - + # Resolve configuration with environment variable fallback - hosts, username, password = resolve_cassandra_config( + hosts, username, password, keyspace = resolve_cassandra_config( host=cassandra_host, username=cassandra_username, - password=cassandra_password + password=cassandra_password, + default_keyspace="librarian" ) - + # Store resolved configuration self.cassandra_host = hosts self.cassandra_username = username @@ -170,34 +177,31 @@ class Processor(AsyncProcessor): metrics = collection_response_metrics, ) - # Storage management producers for collection deletion - self.vector_storage_producer = Producer( - client = self.pulsar_client, - topic = vector_storage_management_topic, - schema = StorageManagementRequest, + # Config service client for collection management + config_request_metrics = ProducerMetrics( + processor = id, flow = None, name = "config-request" ) - self.object_storage_producer = Producer( + self.config_request_producer = Producer( client = self.pulsar_client, - topic = object_storage_management_topic, - schema = StorageManagementRequest, + topic = config_request_queue, + schema = ConfigRequest, + metrics = config_request_metrics, ) - self.triples_storage_producer = Producer( - client = self.pulsar_client, - topic = triples_storage_management_topic, - schema = StorageManagementRequest, + config_response_metrics = ConsumerMetrics( + processor = id, flow = None, name = "config-response" ) - self.storage_response_consumer = Consumer( + self.config_response_consumer = Consumer( taskgroup = self.taskgroup, client = self.pulsar_client, flow = None, - topic = storage_management_response_topic, - subscriber = id, - schema = StorageManagementResponse, - handler = self.on_storage_response, - metrics = storage_response_metrics, + topic = config_response_queue, + subscriber = f"{id}-config", + schema = ConfigResponse, + handler = self.on_config_response, + metrics = config_response_metrics, ) self.librarian = Librarian( @@ -213,14 +217,9 @@ class Processor(AsyncProcessor): ) self.collection_manager = CollectionManager( - cassandra_host = self.cassandra_host, - cassandra_username = self.cassandra_username, - cassandra_password = self.cassandra_password, - keyspace = keyspace, - vector_storage_producer = self.vector_storage_producer, - object_storage_producer = self.object_storage_producer, - triples_storage_producer = self.triples_storage_producer, - storage_response_consumer = self.storage_response_consumer, + config_request_producer = self.config_request_producer, + config_response_consumer = self.config_response_consumer, + taskgroup = self.taskgroup, ) self.register_config_handler(self.on_librarian_config) @@ -236,10 +235,12 @@ class Processor(AsyncProcessor): await self.librarian_response_producer.start() await self.collection_request_consumer.start() await self.collection_response_producer.start() - await self.vector_storage_producer.start() - await self.object_storage_producer.start() - await self.triples_storage_producer.start() - await self.storage_response_consumer.start() + await self.config_request_producer.start() + await self.config_response_consumer.start() + + async def on_config_response(self, message, consumer, flow): + """Forward config responses to collection manager""" + await self.collection_manager.on_config_response(message, consumer, flow) async def on_librarian_config(self, config, version): @@ -464,14 +465,6 @@ class Processor(AsyncProcessor): logger.debug("Collection request processing complete") - async def on_storage_response(self, msg, consumer, flow): - """ - Handle storage management response messages - """ - v = msg.value() - logger.debug("Received storage management response") - await self.collection_manager.on_storage_response(v) - @staticmethod def add_args(parser): diff --git a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py index cf2757af..13726ac3 100755 --- a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py +++ b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py @@ -28,7 +28,7 @@ class Processor(TriplesQueryService): cassandra_password = params.get("cassandra_password") # Resolve configuration with environment variable fallback - hosts, username, password = resolve_cassandra_config( + hosts, username, password, keyspace = resolve_cassandra_config( host=cassandra_host, username=cassandra_username, password=cassandra_password diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py index 012d91b7..07dbf0eb 100755 --- a/trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/milvus/write.py @@ -6,11 +6,9 @@ Accepts entity/vector pairs and writes them to a Milvus store. import logging from .... direct.milvus_doc_embeddings import DocVectors -from .... base import DocumentEmbeddingsStoreService +from .... base import DocumentEmbeddingsStoreService, CollectionConfigHandler from .... base import AsyncProcessor, Consumer, Producer from .... base import ConsumerMetrics, ProducerMetrics -from .... schema import StorageManagementRequest, StorageManagementResponse, Error -from .... schema import vector_storage_management_topic, storage_management_response_topic # Module logger logger = logging.getLogger(__name__) @@ -18,7 +16,7 @@ logger = logging.getLogger(__name__) default_ident = "de-write" default_store_uri = 'http://localhost:19530' -class Processor(DocumentEmbeddingsStoreService): +class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService): def __init__(self, **params): @@ -32,51 +30,11 @@ class Processor(DocumentEmbeddingsStoreService): self.vecstore = DocVectors(store_uri) - # Set up metrics for storage management - storage_request_metrics = ConsumerMetrics( - processor=self.id, flow=None, name="storage-request" - ) - storage_response_metrics = ProducerMetrics( - processor=self.id, flow=None, name="storage-response" - ) - - # Set up consumer for storage management requests - self.storage_request_consumer = Consumer( - taskgroup=self.taskgroup, - client=self.pulsar_client, - flow=None, - topic=vector_storage_management_topic, - subscriber=f"{self.id}-storage", - schema=StorageManagementRequest, - handler=self.on_storage_management, - metrics=storage_request_metrics, - ) - - # Set up producer for storage management responses - self.storage_response_producer = Producer( - client=self.pulsar_client, - topic=storage_management_response_topic, - schema=StorageManagementResponse, - metrics=storage_response_metrics, - ) - - async def start(self): - """Start the processor and its storage management consumer""" - await super().start() - await self.storage_request_consumer.start() - await self.storage_response_producer.start() + # Register for config push notifications + self.register_config_handler(self.on_collection_config) async def store_document_embeddings(self, message): - # Validate collection exists before accepting writes - if not self.vecstore.collection_exists(message.metadata.user, message.metadata.collection): - error_msg = ( - f"Collection {message.metadata.collection} does not exist. " - f"Create it first with tg-set-collection." - ) - logger.error(error_msg) - raise ValueError(error_msg) - for emb in message.chunks: if emb.chunk is None or emb.chunk == b"": continue @@ -102,72 +60,27 @@ class Processor(DocumentEmbeddingsStoreService): help=f'Milvus store URI (default: {default_store_uri})' ) - async def on_storage_management(self, message, consumer, flow): - """Handle storage management requests""" - request = message.value() - logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}") - - try: - if request.operation == "create-collection": - await self.handle_create_collection(request) - elif request.operation == "delete-collection": - await self.handle_delete_collection(request) - else: - response = StorageManagementResponse( - error=Error( - type="invalid_operation", - message=f"Unknown operation: {request.operation}" - ) - ) - await self.storage_response_producer.send(response) - - except Exception as e: - logger.error(f"Error processing storage management request: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="processing_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) - - async def handle_create_collection(self, request): + async def create_collection(self, user: str, collection: str, metadata: dict): """ - No-op for collection creation - collections are created lazily on first write + Create collection via config push - collections are created lazily on first write with the correct dimension determined from the actual embeddings. """ try: - logger.info(f"Collection create request for {request.user}/{request.collection} - will be created lazily on first write") - self.vecstore.create_collection(request.user, request.collection) - - # Send success response - response = StorageManagementResponse(error=None) - await self.storage_response_producer.send(response) + logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write") + self.vecstore.create_collection(user, collection) except Exception as e: - logger.error(f"Failed to handle create collection request: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="creation_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) + logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True) + raise - async def handle_delete_collection(self, request): - """Delete the collection for document embeddings""" + async def delete_collection(self, user: str, collection: str): + """Delete the collection for document embeddings via config push""" try: - self.vecstore.delete_collection(request.user, request.collection) - - # Send success response - response = StorageManagementResponse( - error=None # No error means success - ) - await self.storage_response_producer.send(response) - logger.info(f"Successfully deleted collection {request.user}/{request.collection}") + self.vecstore.delete_collection(user, collection) + logger.info(f"Successfully deleted collection {user}/{collection}") except Exception as e: - logger.error(f"Failed to delete collection: {e}") + logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True) raise def run(): diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py index 4d3c43bb..846855a4 100644 --- a/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/pinecone/write.py @@ -11,11 +11,9 @@ import uuid import os import logging -from .... base import DocumentEmbeddingsStoreService +from .... base import DocumentEmbeddingsStoreService, CollectionConfigHandler from .... base import AsyncProcessor, Consumer, Producer from .... base import ConsumerMetrics, ProducerMetrics -from .... schema import StorageManagementRequest, StorageManagementResponse, Error -from .... schema import vector_storage_management_topic, storage_management_response_topic # Module logger logger = logging.getLogger(__name__) @@ -25,7 +23,7 @@ default_api_key = os.getenv("PINECONE_API_KEY", "not-specified") default_cloud = "aws" default_region = "us-east-1" -class Processor(DocumentEmbeddingsStoreService): +class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService): def __init__(self, **params): @@ -59,33 +57,8 @@ class Processor(DocumentEmbeddingsStoreService): self.last_index_name = None - # Set up metrics for storage management - storage_request_metrics = ConsumerMetrics( - processor=self.id, flow=None, name="storage-request" - ) - storage_response_metrics = ProducerMetrics( - processor=self.id, flow=None, name="storage-response" - ) - - # Set up consumer for storage management requests - self.storage_request_consumer = Consumer( - taskgroup=self.taskgroup, - client=self.pulsar_client, - flow=None, - topic=vector_storage_management_topic, - subscriber=f"{self.id}-storage", - schema=StorageManagementRequest, - handler=self.on_storage_management, - metrics=storage_request_metrics, - ) - - # Set up producer for storage management responses - self.storage_response_producer = Producer( - client=self.pulsar_client, - topic=storage_management_response_topic, - schema=StorageManagementResponse, - metrics=storage_response_metrics, - ) + # Register for config push notifications + self.register_config_handler(self.on_collection_config) def create_index(self, index_name, dim): @@ -115,12 +88,6 @@ class Processor(DocumentEmbeddingsStoreService): "Gave up waiting for index creation" ) - async def start(self): - """Start the processor and its storage management consumer""" - await super().start() - await self.storage_request_consumer.start() - await self.storage_response_producer.start() - async def store_document_embeddings(self, message): for emb in message.chunks: @@ -188,65 +155,22 @@ class Processor(DocumentEmbeddingsStoreService): help=f'Pinecone region, (default: {default_region}' ) - async def on_storage_management(self, message, consumer, flow): - """Handle storage management requests""" - request = message.value() - logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}") - - try: - if request.operation == "create-collection": - await self.handle_create_collection(request) - elif request.operation == "delete-collection": - await self.handle_delete_collection(request) - else: - response = StorageManagementResponse( - error=Error( - type="invalid_operation", - message=f"Unknown operation: {request.operation}" - ) - ) - await self.storage_response_producer.send(response) - - except Exception as e: - logger.error(f"Error processing storage management request: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="processing_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) - - async def handle_create_collection(self, request): + async def create_collection(self, user: str, collection: str, metadata: dict): """ - No-op for collection creation - indexes are created lazily on first write + Create collection via config push - indexes are created lazily on first write with the correct dimension determined from the actual embeddings. """ try: - logger.info(f"Collection create request for {request.user}/{request.collection} - will be created lazily on first write") - - # Send success response - response = StorageManagementResponse(error=None) - await self.storage_response_producer.send(response) + logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write") except Exception as e: - logger.error(f"Failed to handle create collection request: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="creation_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) + logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True) + raise - async def handle_delete_collection(self, request): - """ - Delete all dimension variants of the index for document embeddings. - Since indexes are created with dimension suffixes (e.g., d-user-coll-384), - we need to find and delete all matching indexes. - """ + async def delete_collection(self, user: str, collection: str): + """Delete the collection for document embeddings via config push""" try: - prefix = f"d-{request.user}-{request.collection}-" + prefix = f"d-{user}-{collection}-" # Get all indexes and filter for matches all_indexes = self.pinecone.list_indexes() @@ -261,16 +185,10 @@ class Processor(DocumentEmbeddingsStoreService): for index_name in matching_indexes: self.pinecone.delete_index(index_name) logger.info(f"Deleted Pinecone index: {index_name}") - logger.info(f"Deleted {len(matching_indexes)} index(es) for {request.user}/{request.collection}") - - # Send success response - response = StorageManagementResponse( - error=None # No error means success - ) - await self.storage_response_producer.send(response) + logger.info(f"Deleted {len(matching_indexes)} index(es) for {user}/{collection}") except Exception as e: - logger.error(f"Failed to delete collection: {e}") + logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True) raise def run(): diff --git a/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py index 225beb9c..923ade10 100644 --- a/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py +++ b/trustgraph-flow/trustgraph/storage/doc_embeddings/qdrant/write.py @@ -9,11 +9,9 @@ from qdrant_client.models import Distance, VectorParams import uuid import logging -from .... base import DocumentEmbeddingsStoreService +from .... base import DocumentEmbeddingsStoreService, CollectionConfigHandler from .... base import AsyncProcessor, Consumer, Producer from .... base import ConsumerMetrics, ProducerMetrics -from .... schema import StorageManagementRequest, StorageManagementResponse, Error -from .... schema import vector_storage_management_topic, storage_management_response_topic # Module logger logger = logging.getLogger(__name__) @@ -22,7 +20,7 @@ default_ident = "de-write" default_store_uri = 'http://localhost:6333' -class Processor(DocumentEmbeddingsStoreService): +class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService): def __init__(self, **params): @@ -38,44 +36,8 @@ class Processor(DocumentEmbeddingsStoreService): self.qdrant = QdrantClient(url=store_uri, api_key=api_key) - # Set up storage management if base class attributes are available - # (they may not be in unit tests) - if hasattr(self, 'id') and hasattr(self, 'taskgroup') and hasattr(self, 'pulsar_client'): - # Set up metrics for storage management - storage_request_metrics = ConsumerMetrics( - processor=self.id, flow=None, name="storage-request" - ) - storage_response_metrics = ProducerMetrics( - processor=self.id, flow=None, name="storage-response" - ) - - # Set up consumer for storage management requests - self.storage_request_consumer = Consumer( - taskgroup=self.taskgroup, - client=self.pulsar_client, - flow=None, - topic=vector_storage_management_topic, - subscriber=f"{self.id}-storage", - schema=StorageManagementRequest, - handler=self.on_storage_management, - metrics=storage_request_metrics, - ) - - # Set up producer for storage management responses - self.storage_response_producer = Producer( - client=self.pulsar_client, - topic=storage_management_response_topic, - schema=StorageManagementResponse, - metrics=storage_response_metrics, - ) - - async def start(self): - """Start the processor and its storage management consumer""" - await super().start() - if hasattr(self, 'storage_request_consumer'): - await self.storage_request_consumer.start() - if hasattr(self, 'storage_response_producer'): - await self.storage_response_producer.start() + # Register for config push notifications + self.register_config_handler(self.on_collection_config) async def store_document_embeddings(self, message): @@ -133,65 +95,22 @@ class Processor(DocumentEmbeddingsStoreService): help=f'Qdrant API key (default: None)' ) - async def on_storage_management(self, message, consumer, flow): - """Handle storage management requests""" - request = message.value() - logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}") - - try: - if request.operation == "create-collection": - await self.handle_create_collection(request) - elif request.operation == "delete-collection": - await self.handle_delete_collection(request) - else: - response = StorageManagementResponse( - error=Error( - type="invalid_operation", - message=f"Unknown operation: {request.operation}" - ) - ) - await self.storage_response_producer.send(response) - - except Exception as e: - logger.error(f"Error processing storage management request: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="processing_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) - - async def handle_create_collection(self, request): + async def create_collection(self, user: str, collection: str, metadata: dict): """ - No-op for collection creation - collections are created lazily on first write + Create collection via config push - collections are created lazily on first write with the correct dimension determined from the actual embeddings. """ try: - logger.info(f"Collection create request for {request.user}/{request.collection} - will be created lazily on first write") - - # Send success response - response = StorageManagementResponse(error=None) - await self.storage_response_producer.send(response) + logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write") except Exception as e: - logger.error(f"Failed to handle create collection request: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="creation_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) + logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True) + raise - async def handle_delete_collection(self, request): - """ - Delete all dimension variants of the collection for document embeddings. - Since collections are created with dimension suffixes (e.g., d_user_coll_384), - we need to find and delete all matching collections. - """ + async def delete_collection(self, user: str, collection: str): + """Delete the collection for document embeddings via config push""" try: - prefix = f"d_{request.user}_{request.collection}_" + prefix = f"d_{user}_{collection}_" # Get all collections and filter for matches all_collections = self.qdrant.get_collections().collections @@ -206,16 +125,10 @@ class Processor(DocumentEmbeddingsStoreService): for collection_name in matching_collections: self.qdrant.delete_collection(collection_name) logger.info(f"Deleted Qdrant collection: {collection_name}") - logger.info(f"Deleted {len(matching_collections)} collection(s) for {request.user}/{request.collection}") - - # Send success response - response = StorageManagementResponse( - error=None # No error means success - ) - await self.storage_response_producer.send(response) + logger.info(f"Deleted {len(matching_collections)} collection(s) for {user}/{collection}") except Exception as e: - logger.error(f"Failed to delete collection: {e}") + logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True) raise def run(): diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py index cca0de95..2e192cd6 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py @@ -6,11 +6,9 @@ Accepts entity/vector pairs and writes them to a Milvus store. import logging from .... direct.milvus_graph_embeddings import EntityVectors -from .... base import GraphEmbeddingsStoreService +from .... base import GraphEmbeddingsStoreService, CollectionConfigHandler from .... base import AsyncProcessor, Consumer, Producer from .... base import ConsumerMetrics, ProducerMetrics -from .... schema import StorageManagementRequest, StorageManagementResponse, Error -from .... schema import vector_storage_management_topic, storage_management_response_topic # Module logger logger = logging.getLogger(__name__) @@ -18,7 +16,7 @@ logger = logging.getLogger(__name__) default_ident = "ge-write" default_store_uri = 'http://localhost:19530' -class Processor(GraphEmbeddingsStoreService): +class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService): def __init__(self, **params): @@ -32,51 +30,11 @@ class Processor(GraphEmbeddingsStoreService): self.vecstore = EntityVectors(store_uri) - # Set up metrics for storage management - storage_request_metrics = ConsumerMetrics( - processor=self.id, flow=None, name="storage-request" - ) - storage_response_metrics = ProducerMetrics( - processor=self.id, flow=None, name="storage-response" - ) - - # Set up consumer for storage management requests - self.storage_request_consumer = Consumer( - taskgroup=self.taskgroup, - client=self.pulsar_client, - flow=None, - topic=vector_storage_management_topic, - subscriber=f"{self.id}-storage", - schema=StorageManagementRequest, - handler=self.on_storage_management, - metrics=storage_request_metrics, - ) - - # Set up producer for storage management responses - self.storage_response_producer = Producer( - client=self.pulsar_client, - topic=storage_management_response_topic, - schema=StorageManagementResponse, - metrics=storage_response_metrics, - ) - - async def start(self): - """Start the processor and its storage management consumer""" - await super().start() - await self.storage_request_consumer.start() - await self.storage_response_producer.start() + # Register for config push notifications + self.register_config_handler(self.on_collection_config) async def store_graph_embeddings(self, message): - # Validate collection exists before accepting writes - if not self.vecstore.collection_exists(message.metadata.user, message.metadata.collection): - error_msg = ( - f"Collection {message.metadata.collection} does not exist. " - f"Create it first with tg-set-collection." - ) - logger.error(error_msg) - raise ValueError(error_msg) - for entity in message.entities: if entity.entity.value != "" and entity.entity.value is not None: @@ -98,72 +56,27 @@ class Processor(GraphEmbeddingsStoreService): help=f'Milvus store URI (default: {default_store_uri})' ) - async def on_storage_management(self, message, consumer, flow): - """Handle storage management requests""" - request = message.value() - logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}") - - try: - if request.operation == "create-collection": - await self.handle_create_collection(request) - elif request.operation == "delete-collection": - await self.handle_delete_collection(request) - else: - response = StorageManagementResponse( - error=Error( - type="invalid_operation", - message=f"Unknown operation: {request.operation}" - ) - ) - await self.storage_response_producer.send(response) - - except Exception as e: - logger.error(f"Error processing storage management request: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="processing_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) - - async def handle_create_collection(self, request): + async def create_collection(self, user: str, collection: str, metadata: dict): """ - No-op for collection creation - collections are created lazily on first write + Create collection via config push - collections are created lazily on first write with the correct dimension determined from the actual embeddings. """ try: - logger.info(f"Collection create request for {request.user}/{request.collection} - will be created lazily on first write") - self.vecstore.create_collection(request.user, request.collection) - - # Send success response - response = StorageManagementResponse(error=None) - await self.storage_response_producer.send(response) + logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write") + self.vecstore.create_collection(user, collection) except Exception as e: - logger.error(f"Failed to handle create collection request: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="creation_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) + logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True) + raise - async def handle_delete_collection(self, request): - """Delete the collection for graph embeddings""" + async def delete_collection(self, user: str, collection: str): + """Delete the collection for graph embeddings via config push""" try: - self.vecstore.delete_collection(request.user, request.collection) - - # Send success response - response = StorageManagementResponse( - error=None # No error means success - ) - await self.storage_response_producer.send(response) - logger.info(f"Successfully deleted collection {request.user}/{request.collection}") + self.vecstore.delete_collection(user, collection) + logger.info(f"Successfully deleted collection {user}/{collection}") except Exception as e: - logger.error(f"Failed to delete collection: {e}") + logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True) raise def run(): diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py index 30d3d3e5..9fe38720 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py @@ -11,11 +11,9 @@ import uuid import os import logging -from .... base import GraphEmbeddingsStoreService +from .... base import GraphEmbeddingsStoreService, CollectionConfigHandler from .... base import AsyncProcessor, Consumer, Producer from .... base import ConsumerMetrics, ProducerMetrics -from .... schema import StorageManagementRequest, StorageManagementResponse, Error -from .... schema import vector_storage_management_topic, storage_management_response_topic # Module logger logger = logging.getLogger(__name__) @@ -25,7 +23,7 @@ default_api_key = os.getenv("PINECONE_API_KEY", "not-specified") default_cloud = "aws" default_region = "us-east-1" -class Processor(GraphEmbeddingsStoreService): +class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService): def __init__(self, **params): @@ -59,33 +57,8 @@ class Processor(GraphEmbeddingsStoreService): self.last_index_name = None - # Set up metrics for storage management - storage_request_metrics = ConsumerMetrics( - processor=self.id, flow=None, name="storage-request" - ) - storage_response_metrics = ProducerMetrics( - processor=self.id, flow=None, name="storage-response" - ) - - # Set up consumer for storage management requests - self.storage_request_consumer = Consumer( - taskgroup=self.taskgroup, - client=self.pulsar_client, - flow=None, - topic=vector_storage_management_topic, - subscriber=f"{self.id}-storage", - schema=StorageManagementRequest, - handler=self.on_storage_management, - metrics=storage_request_metrics, - ) - - # Set up producer for storage management responses - self.storage_response_producer = Producer( - client=self.pulsar_client, - topic=storage_management_response_topic, - schema=StorageManagementResponse, - metrics=storage_response_metrics, - ) + # Register for config push notifications + self.register_config_handler(self.on_collection_config) def create_index(self, index_name, dim): @@ -115,12 +88,6 @@ class Processor(GraphEmbeddingsStoreService): "Gave up waiting for index creation" ) - async def start(self): - """Start the processor and its storage management consumer""" - await super().start() - await self.storage_request_consumer.start() - await self.storage_response_producer.start() - async def store_graph_embeddings(self, message): for entity in message.entities: @@ -186,65 +153,22 @@ class Processor(GraphEmbeddingsStoreService): help=f'Pinecone region, (default: {default_region}' ) - async def on_storage_management(self, message, consumer, flow): - """Handle storage management requests""" - request = message.value() - logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}") - - try: - if request.operation == "create-collection": - await self.handle_create_collection(request) - elif request.operation == "delete-collection": - await self.handle_delete_collection(request) - else: - response = StorageManagementResponse( - error=Error( - type="invalid_operation", - message=f"Unknown operation: {request.operation}" - ) - ) - await self.storage_response_producer.send(response) - - except Exception as e: - logger.error(f"Error processing storage management request: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="processing_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) - - async def handle_create_collection(self, request): + async def create_collection(self, user: str, collection: str, metadata: dict): """ - No-op for collection creation - indexes are created lazily on first write + Create collection via config push - indexes are created lazily on first write with the correct dimension determined from the actual embeddings. """ try: - logger.info(f"Collection create request for {request.user}/{request.collection} - will be created lazily on first write") - - # Send success response - response = StorageManagementResponse(error=None) - await self.storage_response_producer.send(response) + logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write") except Exception as e: - logger.error(f"Failed to handle create collection request: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="creation_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) + logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True) + raise - async def handle_delete_collection(self, request): - """ - Delete all dimension variants of the index for graph embeddings. - Since indexes are created with dimension suffixes (e.g., t-user-coll-384), - we need to find and delete all matching indexes. - """ + async def delete_collection(self, user: str, collection: str): + """Delete the collection for graph embeddings via config push""" try: - prefix = f"t-{request.user}-{request.collection}-" + prefix = f"t-{user}-{collection}-" # Get all indexes and filter for matches all_indexes = self.pinecone.list_indexes() @@ -259,16 +183,10 @@ class Processor(GraphEmbeddingsStoreService): for index_name in matching_indexes: self.pinecone.delete_index(index_name) logger.info(f"Deleted Pinecone index: {index_name}") - logger.info(f"Deleted {len(matching_indexes)} index(es) for {request.user}/{request.collection}") - - # Send success response - response = StorageManagementResponse( - error=None # No error means success - ) - await self.storage_response_producer.send(response) + logger.info(f"Deleted {len(matching_indexes)} index(es) for {user}/{collection}") except Exception as e: - logger.error(f"Failed to delete collection: {e}") + logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True) raise def run(): diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py index 0b15996f..39127473 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py @@ -9,11 +9,9 @@ from qdrant_client.models import Distance, VectorParams import uuid import logging -from .... base import GraphEmbeddingsStoreService +from .... base import GraphEmbeddingsStoreService, CollectionConfigHandler from .... base import AsyncProcessor, Consumer, Producer from .... base import ConsumerMetrics, ProducerMetrics -from .... schema import StorageManagementRequest, StorageManagementResponse, Error -from .... schema import vector_storage_management_topic, storage_management_response_topic # Module logger logger = logging.getLogger(__name__) @@ -22,7 +20,7 @@ default_ident = "ge-write" default_store_uri = 'http://localhost:6333' -class Processor(GraphEmbeddingsStoreService): +class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService): def __init__(self, **params): @@ -38,44 +36,8 @@ class Processor(GraphEmbeddingsStoreService): self.qdrant = QdrantClient(url=store_uri, api_key=api_key) - # Set up storage management if base class attributes are available - # (they may not be in unit tests) - if hasattr(self, 'id') and hasattr(self, 'taskgroup') and hasattr(self, 'pulsar_client'): - # Set up metrics for storage management - storage_request_metrics = ConsumerMetrics( - processor=self.id, flow=None, name="storage-request" - ) - storage_response_metrics = ProducerMetrics( - processor=self.id, flow=None, name="storage-response" - ) - - # Set up consumer for storage management requests - self.storage_request_consumer = Consumer( - taskgroup=self.taskgroup, - client=self.pulsar_client, - flow=None, - topic=vector_storage_management_topic, - subscriber=f"{self.id}-storage", - schema=StorageManagementRequest, - handler=self.on_storage_management, - metrics=storage_request_metrics, - ) - - # Set up producer for storage management responses - self.storage_response_producer = Producer( - client=self.pulsar_client, - topic=storage_management_response_topic, - schema=StorageManagementResponse, - metrics=storage_response_metrics, - ) - - async def start(self): - """Start the processor and its storage management consumer""" - await super().start() - if hasattr(self, 'storage_request_consumer'): - await self.storage_request_consumer.start() - if hasattr(self, 'storage_response_producer'): - await self.storage_response_producer.start() + # Register for config push notifications + self.register_config_handler(self.on_collection_config) async def store_graph_embeddings(self, message): @@ -132,65 +94,22 @@ class Processor(GraphEmbeddingsStoreService): help=f'Qdrant API key' ) - async def on_storage_management(self, message, consumer, flow): - """Handle storage management requests""" - request = message.value() - logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}") - - try: - if request.operation == "create-collection": - await self.handle_create_collection(request) - elif request.operation == "delete-collection": - await self.handle_delete_collection(request) - else: - response = StorageManagementResponse( - error=Error( - type="invalid_operation", - message=f"Unknown operation: {request.operation}" - ) - ) - await self.storage_response_producer.send(response) - - except Exception as e: - logger.error(f"Error processing storage management request: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="processing_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) - - async def handle_create_collection(self, request): + async def create_collection(self, user: str, collection: str, metadata: dict): """ - No-op for collection creation - collections are created lazily on first write + Create collection via config push - collections are created lazily on first write with the correct dimension determined from the actual embeddings. """ try: - logger.info(f"Collection create request for {request.user}/{request.collection} - will be created lazily on first write") - - # Send success response - response = StorageManagementResponse(error=None) - await self.storage_response_producer.send(response) + logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write") except Exception as e: - logger.error(f"Failed to handle create collection request: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="creation_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) + logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True) + raise - async def handle_delete_collection(self, request): - """ - Delete all dimension variants of the collection for graph embeddings. - Since collections are created with dimension suffixes (e.g., t_user_coll_384), - we need to find and delete all matching collections. - """ + async def delete_collection(self, user: str, collection: str): + """Delete the collection for graph embeddings via config push""" try: - prefix = f"t_{request.user}_{request.collection}_" + prefix = f"t_{user}_{collection}_" # Get all collections and filter for matches all_collections = self.qdrant.get_collections().collections @@ -205,16 +124,10 @@ class Processor(GraphEmbeddingsStoreService): for collection_name in matching_collections: self.qdrant.delete_collection(collection_name) logger.info(f"Deleted Qdrant collection: {collection_name}") - logger.info(f"Deleted {len(matching_collections)} collection(s) for {request.user}/{request.collection}") - - # Send success response - response = StorageManagementResponse( - error=None # No error means success - ) - await self.storage_response_producer.send(response) + logger.info(f"Deleted {len(matching_collections)} collection(s) for {user}/{collection}") except Exception as e: - logger.error(f"Failed to delete collection: {e}") + logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True) raise def run(): diff --git a/trustgraph-flow/trustgraph/storage/knowledge/store.py b/trustgraph-flow/trustgraph/storage/knowledge/store.py index b39fe09f..a79b7b83 100644 --- a/trustgraph-flow/trustgraph/storage/knowledge/store.py +++ b/trustgraph-flow/trustgraph/storage/knowledge/store.py @@ -23,10 +23,11 @@ class Processor(FlowProcessor): id = params.get("id") # Use helper to resolve configuration - hosts, username, password = resolve_cassandra_config( + hosts, username, password, keyspace = resolve_cassandra_config( host=params.get("cassandra_host"), username=params.get("cassandra_username"), - password=params.get("cassandra_password") + password=params.get("cassandra_password"), + default_keyspace='knowledge' ) super(Processor, self).__init__( diff --git a/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py b/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py index e9dda4d6..c5b8af06 100644 --- a/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/objects/cassandra/write.py @@ -35,7 +35,7 @@ class Processor(FlowProcessor): cassandra_password = params.get("cassandra_password") # Resolve configuration with environment variable fallback - hosts, username, password = resolve_cassandra_config( + hosts, username, password, keyspace = resolve_cassandra_config( host=cassandra_host, username=cassandra_username, password=cassandra_password @@ -55,7 +55,7 @@ class Processor(FlowProcessor): "config_type": self.config_key, } ) - + self.register_specification( ConsumerSpec( name = "input", @@ -341,13 +341,6 @@ class Processor(FlowProcessor): except Exception as e: logger.warning(f"Failed to convert value {value} to type {field_type}: {e}") return str(value) - - async def start(self): - """Start the processor and its storage management consumer""" - await super().start() - await self.storage_request_consumer.start() - await self.storage_response_producer.start() - async def on_object(self, msg, consumer, flow): """Process incoming ExtractedObject and store in Cassandra""" @@ -368,7 +361,7 @@ class Processor(FlowProcessor): if result is None or not result.one(): error_msg = ( f"Collection {obj.metadata.collection} does not exist. " - f"Create it first with tg-set-collection." + f"Create it first via collection management API." ) logger.error(error_msg) raise ValueError(error_msg) diff --git a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py index 6497f95c..b9b42375 100755 --- a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py @@ -11,12 +11,10 @@ import time import logging from .... direct.cassandra_kg import KnowledgeGraph -from .... base import TriplesStoreService +from .... base import TriplesStoreService, CollectionConfigHandler from .... base import AsyncProcessor, Consumer, Producer from .... base import ConsumerMetrics, ProducerMetrics from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config -from .... schema import StorageManagementRequest, StorageManagementResponse, Error -from .... schema import triples_storage_management_topic, storage_management_response_topic # Module logger logger = logging.getLogger(__name__) @@ -24,10 +22,10 @@ logger = logging.getLogger(__name__) default_ident = "triples-write" -class Processor(TriplesStoreService): +class Processor(CollectionConfigHandler, TriplesStoreService): def __init__(self, **params): - + id = params.get("id", default_ident) # Get Cassandra parameters @@ -36,7 +34,7 @@ class Processor(TriplesStoreService): cassandra_password = params.get("cassandra_password") # Resolve configuration with environment variable fallback - hosts, username, password = resolve_cassandra_config( + hosts, username, password, keyspace = resolve_cassandra_config( host=cassandra_host, username=cassandra_username, password=cassandra_password @@ -48,39 +46,15 @@ class Processor(TriplesStoreService): "cassandra_username": username } ) - + self.cassandra_host = hosts self.cassandra_username = username self.cassandra_password = password self.table = None + self.tg = None - # Set up metrics for storage management - storage_request_metrics = ConsumerMetrics( - processor=self.id, flow=None, name="storage-request" - ) - storage_response_metrics = ProducerMetrics( - processor=self.id, flow=None, name="storage-response" - ) - - # Set up consumer for storage management requests - self.storage_request_consumer = Consumer( - taskgroup=self.taskgroup, - client=self.pulsar_client, - flow=None, - topic=triples_storage_management_topic, - subscriber=f"{id}-storage", - schema=StorageManagementRequest, - handler=self.on_storage_management, - metrics=storage_request_metrics, - ) - - # Set up producer for storage management responses - self.storage_response_producer = Producer( - client=self.pulsar_client, - topic=storage_management_response_topic, - schema=StorageManagementResponse, - metrics=storage_response_metrics, - ) + # Register for config push notifications + self.register_config_handler(self.on_collection_config) async def store_triples(self, message): @@ -109,15 +83,6 @@ class Processor(TriplesStoreService): self.table = user - # Validate collection exists before accepting writes - if not self.tg.collection_exists(message.metadata.collection): - error_msg = ( - f"Collection {message.metadata.collection} does not exist. " - f"Create it first with tg-set-collection." - ) - logger.error(error_msg) - raise ValueError(error_msg) - for t in message.triples: self.tg.insert( message.metadata.collection, @@ -126,133 +91,77 @@ class Processor(TriplesStoreService): t.o.value ) - async def start(self): - """Start the processor and its storage management consumer""" - await super().start() - await self.storage_request_consumer.start() - await self.storage_response_producer.start() - - async def on_storage_management(self, message, consumer, flow): - """Handle storage management requests""" - request = message.value() - logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}") - - try: - if request.operation == "create-collection": - await self.handle_create_collection(request) - elif request.operation == "delete-collection": - await self.handle_delete_collection(request) - else: - response = StorageManagementResponse( - error=Error( - type="invalid_operation", - message=f"Unknown operation: {request.operation}" - ) - ) - await self.storage_response_producer.send(response) - - except Exception as e: - logger.error(f"Error processing storage management request: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="processing_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) - - async def handle_create_collection(self, request): - """Create a collection in Cassandra triple store""" + async def create_collection(self, user: str, collection: str, metadata: dict): + """Create a collection in Cassandra triple store via config push""" try: # Create or reuse connection for this user's keyspace - if self.table is None or self.table != request.user: + if self.table is None or self.table != user: self.tg = None try: if self.cassandra_username and self.cassandra_password: self.tg = KnowledgeGraph( hosts=self.cassandra_host, - keyspace=request.user, + keyspace=user, username=self.cassandra_username, password=self.cassandra_password ) else: self.tg = KnowledgeGraph( hosts=self.cassandra_host, - keyspace=request.user, + keyspace=user, ) except Exception as e: - logger.error(f"Failed to connect to Cassandra for user {request.user}: {e}") + logger.error(f"Failed to connect to Cassandra for user {user}: {e}") raise - self.table = request.user + self.table = user # Create collection using the built-in method - logger.info(f"Creating collection {request.collection} for user {request.user}") + logger.info(f"Creating collection {collection} for user {user}") - if self.tg.collection_exists(request.collection): - logger.info(f"Collection {request.collection} already exists") + if self.tg.collection_exists(collection): + logger.info(f"Collection {collection} already exists") else: - self.tg.create_collection(request.collection) - logger.info(f"Created collection {request.collection}") - - # Send success response - response = StorageManagementResponse(error=None) - await self.storage_response_producer.send(response) + self.tg.create_collection(collection) + logger.info(f"Created collection {collection}") except Exception as e: - logger.error(f"Failed to create collection: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="creation_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) + logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True) + raise - async def handle_delete_collection(self, request): + async def delete_collection(self, user: str, collection: str): """Delete all data for a specific collection from the unified triples table""" try: # Create or reuse connection for this user's keyspace - if self.table is None or self.table != request.user: + if self.table is None or self.table != user: self.tg = None try: if self.cassandra_username and self.cassandra_password: self.tg = KnowledgeGraph( hosts=self.cassandra_host, - keyspace=request.user, + keyspace=user, username=self.cassandra_username, password=self.cassandra_password ) else: self.tg = KnowledgeGraph( hosts=self.cassandra_host, - keyspace=request.user, + keyspace=user, ) except Exception as e: - logger.error(f"Failed to connect to Cassandra for user {request.user}: {e}") + logger.error(f"Failed to connect to Cassandra for user {user}: {e}") raise - self.table = request.user + self.table = user # Delete all triples for this collection using the built-in method - try: - self.tg.delete_collection(request.collection) - logger.info(f"Deleted all triples for collection {request.collection} from keyspace {request.user}") - except Exception as e: - logger.error(f"Failed to delete collection data: {e}") - raise - - # Send success response - response = StorageManagementResponse( - error=None # No error means success - ) - await self.storage_response_producer.send(response) - logger.info(f"Successfully deleted collection {request.user}/{request.collection}") + self.tg.delete_collection(collection) + logger.info(f"Deleted all triples for collection {collection} from keyspace {user}") except Exception as e: - logger.error(f"Failed to delete collection: {e}") + logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True) raise @staticmethod diff --git a/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py b/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py index d0800b67..f08eeb91 100755 --- a/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py @@ -12,11 +12,9 @@ import logging from falkordb import FalkorDB -from .... base import TriplesStoreService +from .... base import TriplesStoreService, CollectionConfigHandler from .... base import AsyncProcessor, Consumer, Producer from .... base import ConsumerMetrics, ProducerMetrics -from .... schema import StorageManagementRequest, StorageManagementResponse, Error -from .... schema import triples_storage_management_topic, storage_management_response_topic # Module logger logger = logging.getLogger(__name__) @@ -26,10 +24,10 @@ default_ident = "triples-write" default_graph_url = 'falkor://falkordb:6379' default_database = 'falkordb' -class Processor(TriplesStoreService): +class Processor(CollectionConfigHandler, TriplesStoreService): def __init__(self, **params): - + graph_url = params.get("graph_url", default_graph_url) database = params.get("database", default_database) @@ -44,33 +42,8 @@ class Processor(TriplesStoreService): self.io = FalkorDB.from_url(graph_url).select_graph(database) - # Set up metrics for storage management - storage_request_metrics = ConsumerMetrics( - processor=self.id, flow=None, name="storage-request" - ) - storage_response_metrics = ProducerMetrics( - processor=self.id, flow=None, name="storage-response" - ) - - # Set up consumer for storage management requests - self.storage_request_consumer = Consumer( - taskgroup=self.taskgroup, - client=self.pulsar_client, - flow=None, - topic=triples_storage_management_topic, - subscriber=f"{self.id}-storage", - schema=StorageManagementRequest, - handler=self.on_storage_management, - metrics=storage_request_metrics, - ) - - # Set up producer for storage management responses - self.storage_response_producer = Producer( - client=self.pulsar_client, - topic=storage_management_response_topic, - schema=StorageManagementResponse, - metrics=storage_response_metrics, - ) + # Register for config push notifications + self.register_config_handler(self.on_collection_config) def create_node(self, uri, user, collection): @@ -184,7 +157,7 @@ class Processor(TriplesStoreService): if not self.collection_exists(user, collection): error_msg = ( f"Collection {collection} does not exist. " - f"Create it first with tg-set-collection." + f"Create it first via collection management API." ) logger.error(error_msg) raise ValueError(error_msg) @@ -217,95 +190,58 @@ class Processor(TriplesStoreService): help=f'FalkorDB database (default: {default_database})' ) - async def start(self): - """Start the processor and its storage management consumer""" - await super().start() - await self.storage_request_consumer.start() - await self.storage_response_producer.start() - - async def on_storage_management(self, message, consumer, flow): - """Handle storage management requests""" - request = message.value() - logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}") - + async def create_collection(self, user: str, collection: str, metadata: dict): + """Create collection metadata in FalkorDB via config push""" try: - if request.operation == "create-collection": - await self.handle_create_collection(request) - elif request.operation == "delete-collection": - await self.handle_delete_collection(request) + # Check if collection exists + result = self.io.query( + "MATCH (c:CollectionMetadata {user: $user, collection: $collection}) RETURN c LIMIT 1", + params={"user": user, "collection": collection} + ) + if result.result_set: + logger.info(f"Collection {user}/{collection} already exists") else: - response = StorageManagementResponse( - error=Error( - type="invalid_operation", - message=f"Unknown operation: {request.operation}" - ) + # Create collection metadata node + import datetime + self.io.query( + "MERGE (c:CollectionMetadata {user: $user, collection: $collection}) " + "SET c.created_at = $created_at", + params={ + "user": user, + "collection": collection, + "created_at": datetime.datetime.now().isoformat() + } ) - await self.storage_response_producer.send(response) + logger.info(f"Created collection {user}/{collection}") except Exception as e: - logger.error(f"Error processing storage management request: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="processing_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) + logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True) + raise - async def handle_create_collection(self, request): - """Create collection metadata in FalkorDB""" - try: - if self.collection_exists(request.user, request.collection): - logger.info(f"Collection {request.user}/{request.collection} already exists") - else: - self.create_collection(request.user, request.collection) - logger.info(f"Created collection {request.user}/{request.collection}") - - # Send success response - response = StorageManagementResponse(error=None) - await self.storage_response_producer.send(response) - - except Exception as e: - logger.error(f"Failed to create collection: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="creation_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) - - async def handle_delete_collection(self, request): - """Delete the collection for FalkorDB triples""" + async def delete_collection(self, user: str, collection: str): + """Delete the collection for FalkorDB triples via config push""" try: # Delete all nodes and literals for this user/collection node_result = self.io.query( "MATCH (n:Node {user: $user, collection: $collection}) DETACH DELETE n", - params={"user": request.user, "collection": request.collection} + params={"user": user, "collection": collection} ) literal_result = self.io.query( "MATCH (n:Literal {user: $user, collection: $collection}) DETACH DELETE n", - params={"user": request.user, "collection": request.collection} + params={"user": user, "collection": collection} ) # Delete collection metadata node metadata_result = self.io.query( "MATCH (c:CollectionMetadata {user: $user, collection: $collection}) DELETE c", - params={"user": request.user, "collection": request.collection} + params={"user": user, "collection": collection} ) - logger.info(f"Deleted {node_result.nodes_deleted} nodes, {literal_result.nodes_deleted} literals, and {metadata_result.nodes_deleted} metadata nodes for collection {request.user}/{request.collection}") - - # Send success response - response = StorageManagementResponse( - error=None # No error means success - ) - await self.storage_response_producer.send(response) - logger.info(f"Successfully deleted collection {request.user}/{request.collection}") + logger.info(f"Deleted {node_result.nodes_deleted} nodes, {literal_result.nodes_deleted} literals, and {metadata_result.nodes_deleted} metadata nodes for collection {user}/{collection}") except Exception as e: - logger.error(f"Failed to delete collection: {e}") + logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True) raise def run(): diff --git a/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py b/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py index 84248952..8105b14e 100755 --- a/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/memgraph/write.py @@ -12,11 +12,9 @@ import logging from neo4j import GraphDatabase -from .... base import TriplesStoreService +from .... base import TriplesStoreService, CollectionConfigHandler from .... base import AsyncProcessor, Consumer, Producer from .... base import ConsumerMetrics, ProducerMetrics -from .... schema import StorageManagementRequest, StorageManagementResponse, Error -from .... schema import triples_storage_management_topic, storage_management_response_topic # Module logger logger = logging.getLogger(__name__) @@ -28,10 +26,10 @@ default_username = 'memgraph' default_password = 'password' default_database = 'memgraph' -class Processor(TriplesStoreService): +class Processor(CollectionConfigHandler, TriplesStoreService): def __init__(self, **params): - + graph_host = params.get("graph_host", default_graph_host) username = params.get("username", default_username) password = params.get("password", default_password) @@ -53,33 +51,8 @@ class Processor(TriplesStoreService): with self.io.session(database=self.db) as session: self.create_indexes(session) - # Set up metrics for storage management - storage_request_metrics = ConsumerMetrics( - processor=self.id, flow=None, name="storage-request" - ) - storage_response_metrics = ProducerMetrics( - processor=self.id, flow=None, name="storage-response" - ) - - # Set up consumer for storage management requests - self.storage_request_consumer = Consumer( - taskgroup=self.taskgroup, - client=self.pulsar_client, - flow=None, - topic=triples_storage_management_topic, - subscriber=f"{self.id}-storage", - schema=StorageManagementRequest, - handler=self.on_storage_management, - metrics=storage_request_metrics, - ) - - # Set up producer for storage management responses - self.storage_response_producer = Producer( - client=self.pulsar_client, - topic=storage_management_response_topic, - schema=StorageManagementResponse, - metrics=storage_response_metrics, - ) + # Register for config push notifications + self.register_config_handler(self.on_collection_config) def create_indexes(self, session): @@ -267,28 +240,6 @@ class Processor(TriplesStoreService): src=t.s.value, dest=t.o.value, uri=t.p.value, user=user, collection=collection, ) - def collection_exists(self, user, collection): - """Check if collection metadata node exists""" - with self.io.session(database=self.db) as session: - result = session.run( - "MATCH (c:CollectionMetadata {user: $user, collection: $collection}) " - "RETURN c LIMIT 1", - user=user, collection=collection - ) - return bool(list(result)) - - def create_collection(self, user, collection): - """Create collection metadata node""" - import datetime - with self.io.session(database=self.db) as session: - session.run( - "MERGE (c:CollectionMetadata {user: $user, collection: $collection}) " - "SET c.created_at = $created_at", - user=user, collection=collection, - created_at=datetime.datetime.now().isoformat() - ) - logger.info(f"Created collection metadata node for {user}/{collection}") - async def store_triples(self, message): # Extract user and collection from metadata @@ -299,7 +250,7 @@ class Processor(TriplesStoreService): if not self.collection_exists(user, collection): error_msg = ( f"Collection {collection} does not exist. " - f"Create it first with tg-set-collection." + f"Create it first via collection management API." ) logger.error(error_msg) raise ValueError(error_msg) @@ -348,73 +299,50 @@ class Processor(TriplesStoreService): help=f'Memgraph database (default: {default_database})' ) - async def start(self): - """Start the processor and its storage management consumer""" - await super().start() - await self.storage_request_consumer.start() - await self.storage_response_producer.start() + def _collection_exists_in_db(self, user, collection): + """Check if collection metadata node exists""" + with self.io.session(database=self.db) as session: + result = session.run( + "MATCH (c:CollectionMetadata {user: $user, collection: $collection}) " + "RETURN c LIMIT 1", + user=user, collection=collection + ) + return bool(list(result)) - async def on_storage_management(self, message, consumer, flow): - """Handle storage management requests""" - request = message.value() - logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}") + def _create_collection_in_db(self, user, collection): + """Create collection metadata node""" + import datetime + with self.io.session(database=self.db) as session: + session.run( + "MERGE (c:CollectionMetadata {user: $user, collection: $collection}) " + "SET c.created_at = $created_at", + user=user, collection=collection, + created_at=datetime.datetime.now().isoformat() + ) + logger.info(f"Created collection metadata node for {user}/{collection}") + async def create_collection(self, user: str, collection: str, metadata: dict): + """Create collection metadata in Memgraph via config push""" try: - if request.operation == "create-collection": - await self.handle_create_collection(request) - elif request.operation == "delete-collection": - await self.handle_delete_collection(request) + if self._collection_exists_in_db(user, collection): + logger.info(f"Collection {user}/{collection} already exists") else: - response = StorageManagementResponse( - error=Error( - type="invalid_operation", - message=f"Unknown operation: {request.operation}" - ) - ) - await self.storage_response_producer.send(response) + self._create_collection_in_db(user, collection) + logger.info(f"Created collection {user}/{collection}") except Exception as e: - logger.error(f"Error processing storage management request: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="processing_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) + logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True) + raise - async def handle_create_collection(self, request): - """Create collection metadata in Memgraph""" - try: - if self.collection_exists(request.user, request.collection): - logger.info(f"Collection {request.user}/{request.collection} already exists") - else: - self.create_collection(request.user, request.collection) - logger.info(f"Created collection {request.user}/{request.collection}") - - # Send success response - response = StorageManagementResponse(error=None) - await self.storage_response_producer.send(response) - - except Exception as e: - logger.error(f"Failed to create collection: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="creation_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) - - async def handle_delete_collection(self, request): - """Delete all data for a specific collection""" + async def delete_collection(self, user: str, collection: str): + """Delete all data for a specific collection via config push""" try: with self.io.session(database=self.db) as session: # Delete all nodes for this user and collection node_result = session.run( "MATCH (n:Node {user: $user, collection: $collection}) " "DETACH DELETE n", - user=request.user, collection=request.collection + user=user, collection=collection ) nodes_deleted = node_result.consume().counters.nodes_deleted @@ -422,7 +350,7 @@ class Processor(TriplesStoreService): literal_result = session.run( "MATCH (n:Literal {user: $user, collection: $collection}) " "DETACH DELETE n", - user=request.user, collection=request.collection + user=user, collection=collection ) literals_deleted = literal_result.consume().counters.nodes_deleted @@ -430,20 +358,13 @@ class Processor(TriplesStoreService): metadata_result = session.run( "MATCH (c:CollectionMetadata {user: $user, collection: $collection}) " "DELETE c", - user=request.user, collection=request.collection + user=user, collection=collection ) metadata_deleted = metadata_result.consume().counters.nodes_deleted # Note: Relationships are automatically deleted with DETACH DELETE - logger.info(f"Deleted {nodes_deleted} nodes, {literals_deleted} literals, and {metadata_deleted} metadata nodes for {request.user}/{request.collection}") - - # Send success response - response = StorageManagementResponse( - error=None # No error means success - ) - await self.storage_response_producer.send(response) - logger.info(f"Successfully deleted collection {request.user}/{request.collection}") + logger.info(f"Deleted {nodes_deleted} nodes, {literals_deleted} literals, and {metadata_deleted} metadata nodes for {user}/{collection}") except Exception as e: logger.error(f"Failed to delete collection: {e}") diff --git a/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py b/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py index 227356ce..e33b26ca 100755 --- a/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py @@ -11,11 +11,9 @@ import time import logging from neo4j import GraphDatabase -from .... base import TriplesStoreService +from .... base import TriplesStoreService, CollectionConfigHandler from .... base import AsyncProcessor, Consumer, Producer from .... base import ConsumerMetrics, ProducerMetrics -from .... schema import StorageManagementRequest, StorageManagementResponse, Error -from .... schema import triples_storage_management_topic, storage_management_response_topic # Module logger logger = logging.getLogger(__name__) @@ -27,10 +25,10 @@ default_username = 'neo4j' default_password = 'password' default_database = 'neo4j' -class Processor(TriplesStoreService): +class Processor(CollectionConfigHandler, TriplesStoreService): def __init__(self, **params): - + id = params.get("id", default_ident) graph_host = params.get("graph_host", default_graph_host) @@ -53,33 +51,8 @@ class Processor(TriplesStoreService): with self.io.session(database=self.db) as session: self.create_indexes(session) - # Set up metrics for storage management - storage_request_metrics = ConsumerMetrics( - processor=self.id, flow=None, name="storage-request" - ) - storage_response_metrics = ProducerMetrics( - processor=self.id, flow=None, name="storage-response" - ) - - # Set up consumer for storage management requests - self.storage_request_consumer = Consumer( - taskgroup=self.taskgroup, - client=self.pulsar_client, - flow=None, - topic=triples_storage_management_topic, - subscriber=f"{id}-storage", - schema=StorageManagementRequest, - handler=self.on_storage_management, - metrics=storage_request_metrics, - ) - - # Set up producer for storage management responses - self.storage_response_producer = Producer( - client=self.pulsar_client, - topic=storage_management_response_topic, - schema=StorageManagementResponse, - metrics=storage_response_metrics, - ) + # Register for config push notifications + self.register_config_handler(self.on_collection_config) def create_indexes(self, session): @@ -232,7 +205,7 @@ class Processor(TriplesStoreService): if not self.collection_exists(user, collection): error_msg = ( f"Collection {collection} does not exist. " - f"Create it first with tg-set-collection." + f"Create it first via collection management API." ) logger.error(error_msg) raise ValueError(error_msg) @@ -277,42 +250,7 @@ class Processor(TriplesStoreService): help=f'Neo4j database (default: {default_database})' ) - async def start(self): - """Start the processor and its storage management consumer""" - await super().start() - await self.storage_request_consumer.start() - await self.storage_response_producer.start() - - async def on_storage_management(self, message, consumer, flow): - """Handle storage management requests""" - request = message.value() - logger.info(f"Storage management request: {request.operation} for {request.user}/{request.collection}") - - try: - if request.operation == "create-collection": - await self.handle_create_collection(request) - elif request.operation == "delete-collection": - await self.handle_delete_collection(request) - else: - response = StorageManagementResponse( - error=Error( - type="invalid_operation", - message=f"Unknown operation: {request.operation}" - ) - ) - await self.storage_response_producer.send(response) - - except Exception as e: - logger.error(f"Error processing storage management request: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="processing_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) - - def collection_exists(self, user, collection): + def _collection_exists_in_db(self, user, collection): """Check if collection metadata node exists""" with self.io.session(database=self.db) as session: result = session.run( @@ -322,7 +260,7 @@ class Processor(TriplesStoreService): ) return bool(list(result)) - def create_collection(self, user, collection): + def _create_collection_in_db(self, user, collection): """Create collection metadata node""" import datetime with self.io.session(database=self.db) as session: @@ -334,38 +272,28 @@ class Processor(TriplesStoreService): ) logger.info(f"Created collection metadata node for {user}/{collection}") - async def handle_create_collection(self, request): - """Create collection metadata in Neo4j""" + async def create_collection(self, user: str, collection: str, metadata: dict): + """Create collection metadata in Neo4j via config push""" try: - if self.collection_exists(request.user, request.collection): - logger.info(f"Collection {request.user}/{request.collection} already exists") + if self._collection_exists_in_db(user, collection): + logger.info(f"Collection {user}/{collection} already exists") else: - self.create_collection(request.user, request.collection) - logger.info(f"Created collection {request.user}/{request.collection}") - - # Send success response - response = StorageManagementResponse(error=None) - await self.storage_response_producer.send(response) + self._create_collection_in_db(user, collection) + logger.info(f"Created collection {user}/{collection}") except Exception as e: - logger.error(f"Failed to create collection: {e}", exc_info=True) - response = StorageManagementResponse( - error=Error( - type="creation_error", - message=str(e) - ) - ) - await self.storage_response_producer.send(response) + logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True) + raise - async def handle_delete_collection(self, request): - """Delete all data for a specific collection""" + async def delete_collection(self, user: str, collection: str): + """Delete all data for a specific collection via config push""" try: with self.io.session(database=self.db) as session: # Delete all nodes for this user and collection node_result = session.run( "MATCH (n:Node {user: $user, collection: $collection}) " "DETACH DELETE n", - user=request.user, collection=request.collection + user=user, collection=collection ) nodes_deleted = node_result.consume().counters.nodes_deleted @@ -373,7 +301,7 @@ class Processor(TriplesStoreService): literal_result = session.run( "MATCH (n:Literal {user: $user, collection: $collection}) " "DETACH DELETE n", - user=request.user, collection=request.collection + user=user, collection=collection ) literals_deleted = literal_result.consume().counters.nodes_deleted @@ -383,21 +311,14 @@ class Processor(TriplesStoreService): metadata_result = session.run( "MATCH (c:CollectionMetadata {user: $user, collection: $collection}) " "DELETE c", - user=request.user, collection=request.collection + user=user, collection=collection ) metadata_deleted = metadata_result.consume().counters.nodes_deleted - logger.info(f"Deleted {nodes_deleted} nodes, {literals_deleted} literals, and {metadata_deleted} metadata nodes for {request.user}/{request.collection}") - - # Send success response - response = StorageManagementResponse( - error=None # No error means success - ) - await self.storage_response_producer.send(response) - logger.info(f"Successfully deleted collection {request.user}/{request.collection}") + logger.info(f"Deleted {nodes_deleted} nodes, {literals_deleted} literals, and {metadata_deleted} metadata nodes for {user}/{collection}") except Exception as e: - logger.error(f"Failed to delete collection: {e}") + logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True) raise def run(): diff --git a/trustgraph-flow/trustgraph/tables/library.py b/trustgraph-flow/trustgraph/tables/library.py index 839f3afa..b5465f90 100644 --- a/trustgraph-flow/trustgraph/tables/library.py +++ b/trustgraph-flow/trustgraph/tables/library.py @@ -111,21 +111,6 @@ class LibraryTableStore: ); """); - logger.debug("collections table...") - - self.cassandra.execute(""" - CREATE TABLE IF NOT EXISTS collections ( - user text, - collection text, - name text, - description text, - tags set, - created_at timestamp, - updated_at timestamp, - PRIMARY KEY (user, collection) - ); - """); - logger.info("Cassandra schema OK.") def prepare_statements(self): @@ -202,43 +187,6 @@ class LibraryTableStore: LIMIT 1 """) - # Collection management statements - self.insert_collection_stmt = self.cassandra.prepare(""" - INSERT INTO collections - (user, collection, name, description, tags, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?) - """) - - self.update_collection_stmt = self.cassandra.prepare(""" - UPDATE collections - SET name = ?, description = ?, tags = ?, updated_at = ? - WHERE user = ? AND collection = ? - """) - - self.get_collection_stmt = self.cassandra.prepare(""" - SELECT collection, name, description, tags, created_at, updated_at - FROM collections - WHERE user = ? AND collection = ? - """) - - self.list_collections_stmt = self.cassandra.prepare(""" - SELECT collection, name, description, tags, created_at, updated_at - FROM collections - WHERE user = ? - """) - - self.delete_collection_stmt = self.cassandra.prepare(""" - DELETE FROM collections - WHERE user = ? AND collection = ? - """) - - self.collection_exists_stmt = self.cassandra.prepare(""" - SELECT collection - FROM collections - WHERE user = ? AND collection = ? - LIMIT 1 - """) - self.list_processing_stmt = self.cassandra.prepare(""" SELECT id, document_id, time, flow, collection, tags @@ -572,146 +520,3 @@ class LibraryTableStore: logger.debug("Done") return lst - - - - # Collection management methods - - async def ensure_collection_exists(self, user, collection): - """Ensure collection metadata record exists, create if not""" - try: - resp = await asyncio.get_event_loop().run_in_executor( - None, self.cassandra.execute, self.collection_exists_stmt, [user, collection] - ) - if resp: - return - import datetime - now = datetime.datetime.now() - await asyncio.get_event_loop().run_in_executor( - None, self.cassandra.execute, self.insert_collection_stmt, - [user, collection, collection, "", set(), now, now] - ) - logger.debug(f"Created collection metadata for {user}/{collection}") - except Exception as e: - logger.error(f"Error ensuring collection exists: {e}") - raise - - async def list_collections(self, user, tag_filter=None): - """List collections for a user, optionally filtered by tags""" - try: - resp = await asyncio.get_event_loop().run_in_executor( - None, self.cassandra.execute, self.list_collections_stmt, [user] - ) - collections = [] - for row in resp: - collection_data = { - "user": user, - "collection": row[0], - "name": row[1] or row[0], - "description": row[2] or "", - "tags": list(row[3]) if row[3] else [], - "created_at": row[4].isoformat() if row[4] else "", - "updated_at": row[5].isoformat() if row[5] else "" - } - if tag_filter: - collection_tags = set(collection_data["tags"]) - filter_tags = set(tag_filter) - if not filter_tags.intersection(collection_tags): - continue - collections.append(collection_data) - return collections - except Exception as e: - logger.error(f"Error listing collections: {e}") - raise - - async def update_collection(self, user, collection, name=None, description=None, tags=None): - """Update collection metadata""" - try: - resp = await asyncio.get_event_loop().run_in_executor( - None, self.cassandra.execute, self.get_collection_stmt, [user, collection] - ) - if not resp: - raise RequestError(f"Collection {collection} not found") - row = resp.one() - current_name = row[1] or collection - current_description = row[2] or "" - current_tags = set(row[3]) if row[3] else set() - new_name = name if name is not None else current_name - new_description = description if description is not None else current_description - new_tags = set(tags) if tags is not None else current_tags - import datetime - now = datetime.datetime.now() - await asyncio.get_event_loop().run_in_executor( - None, self.cassandra.execute, self.update_collection_stmt, - [new_name, new_description, new_tags, now, user, collection] - ) - return { - "user": user, "collection": collection, "name": new_name, - "description": new_description, "tags": list(new_tags), - "updated_at": now.isoformat() - } - except Exception as e: - logger.error(f"Error updating collection: {e}") - raise - - async def delete_collection(self, user, collection): - """Delete collection metadata record""" - try: - await asyncio.get_event_loop().run_in_executor( - None, self.cassandra.execute, self.delete_collection_stmt, [user, collection] - ) - logger.debug(f"Deleted collection metadata for {user}/{collection}") - except Exception as e: - logger.error(f"Error deleting collection metadata: {e}") - raise - - async def get_collection(self, user, collection): - """Get collection metadata""" - try: - resp = await asyncio.get_event_loop().run_in_executor( - None, self.cassandra.execute, self.get_collection_stmt, [user, collection] - ) - if not resp: - return None - row = resp.one() - return { - "user": user, "collection": row[0], "name": row[1] or row[0], - "description": row[2] or "", "tags": list(row[3]) if row[3] else [], - "created_at": row[4].isoformat() if row[4] else "", - "updated_at": row[5].isoformat() if row[5] else "" - } - except Exception as e: - logger.error(f"Error getting collection: {e}") - raise - - async def create_collection(self, user, collection, name=None, description=None, tags=None): - """Create a new collection metadata record""" - try: - import datetime - now = datetime.datetime.now() - - # Set defaults for optional parameters - name = name if name is not None else collection - description = description if description is not None else "" - tags = tags if tags is not None else set() - - await asyncio.get_event_loop().run_in_executor( - None, self.cassandra.execute, self.insert_collection_stmt, - [user, collection, name, description, tags, now, now] - ) - - logger.info(f"Created collection {user}/{collection}") - - # Return the created collection data - return { - "user": user, - "collection": collection, - "name": name, - "description": description, - "tags": list(tags) if isinstance(tags, set) else tags, - "created_at": now.isoformat(), - "updated_at": now.isoformat() - } - except Exception as e: - logger.error(f"Error creating collection: {e}") - raise