From 00c1ca681b684275a00eedac965ed8d0812e0a52 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Mon, 16 Feb 2026 13:26:43 +0000 Subject: [PATCH] Entity-centric graph (#633) * Tech spec for new entity-centric graph schema * Graph implementation --- docs/tech-specs/entity-centric-graph.md | 260 ++++++++ tests/pytest.ini | 3 +- .../test_direct/test_entity_centric_kg.py | 599 ++++++++++++++++++ .../test_triples_cassandra_query.py | 291 +++++---- .../test_triples_cassandra_storage.py | 197 +++--- .../trustgraph/direct/cassandra_kg.py | 577 +++++++++++++++++ .../query/triples/cassandra/service.py | 79 ++- .../storage/triples/cassandra/write.py | 77 ++- 8 files changed, 1858 insertions(+), 225 deletions(-) create mode 100644 docs/tech-specs/entity-centric-graph.md create mode 100644 tests/unit/test_direct/test_entity_centric_kg.py diff --git a/docs/tech-specs/entity-centric-graph.md b/docs/tech-specs/entity-centric-graph.md new file mode 100644 index 00000000..aa695811 --- /dev/null +++ b/docs/tech-specs/entity-centric-graph.md @@ -0,0 +1,260 @@ +# Entity-Centric Knowledge Graph Storage on Cassandra + +## Overview + +This document describes a storage model for RDF-style knowledge graphs on Apache Cassandra. The model uses an **entity-centric** approach where every entity knows every quad it participates in and the role it plays. This replaces a traditional multi-table SPO permutation approach with just two tables. + +## Background and Motivation + +### The Traditional Approach + +A standard RDF quad store on Cassandra requires multiple denormalised tables to cover query patterns — typically 6 or more tables representing different permutations of Subject, Predicate, Object, and Dataset (SPOD). Each quad is written to every table, resulting in significant write amplification, operational overhead, and schema complexity. + +Additionally, label resolution (fetching human-readable names for entities) requires separate round-trip queries, which is particularly costly in AI and GraphRAG use cases where labels are essential for LLM context. + +### The Entity-Centric Insight + +Every quad `(D, S, P, O)` involves up to 4 entities. By writing a row for each entity's participation in the quad, we guarantee that **any query with at least one known element will hit a partition key**. This covers all 16 query patterns with a single data table. + +Key benefits: + +- **2 tables** instead of 7+ +- **4 writes per quad** instead of 6+ +- **Label resolution for free** — an entity's labels are co-located with its relationships, naturally warming the application cache +- **All 16 query patterns** served by single-partition reads +- **Simpler operations** — one data table to tune, compact, and repair + +## Schema + +### Table 1: quads_by_entity + +The primary data table. Every entity has a partition containing all quads it participates in. Named to reflect the query pattern (lookup by entity). + +```sql +CREATE TABLE quads_by_entity ( + collection text, -- Collection/tenant scope (always specified) + entity text, -- The entity this row is about + role text, -- 'S', 'P', 'O', 'G' — how this entity participates + p text, -- Predicate of the quad + otype text, -- 'U' (URI), 'L' (literal), 'T' (triple/reification) + s text, -- Subject of the quad + o text, -- Object of the quad + d text, -- Dataset/graph of the quad + dtype text, -- XSD datatype (when otype = 'L'), e.g. 'xsd:string' + lang text, -- Language tag (when otype = 'L'), e.g. 'en', 'fr' + PRIMARY KEY ((collection, entity), role, p, otype, s, o, d) +); +``` + +**Partition key**: `(collection, entity)` — scoped to collection, one partition per entity. + +**Clustering column order rationale**: + +1. **role** — most queries start with "where is this entity a subject/object" +2. **p** — next most common filter, "give me all `knows` relationships" +3. **otype** — enables filtering by URI-valued vs literal-valued relationships +4. **s, o, d** — remaining columns for uniqueness + +### Table 2: quads_by_collection + +Supports collection-level queries and deletion. Provides a manifest of all quads belonging to a collection. Named to reflect the query pattern (lookup by collection). + +```sql +CREATE TABLE quads_by_collection ( + collection text, + d text, -- Dataset/graph of the quad + s text, -- Subject of the quad + p text, -- Predicate of the quad + o text, -- Object of the quad + otype text, -- 'U' (URI), 'L' (literal), 'T' (triple/reification) + dtype text, -- XSD datatype (when otype = 'L') + lang text, -- Language tag (when otype = 'L') + PRIMARY KEY (collection, d, s, p, o) +); +``` + +Clustered by dataset first, enabling deletion at either collection or dataset granularity. + +## Write Path + +For each incoming quad `(D, S, P, O)` within a collection `C`, write **4 rows** to `quads_by_entity` and **1 row** to `quads_by_collection`. + +### Example + +Given the quad in collection `tenant1`: + +``` +Dataset: https://example.org/graph1 +Subject: https://example.org/Alice +Predicate: https://example.org/knows +Object: https://example.org/Bob +``` + +Write 4 rows to `quads_by_entity`: + +| collection | entity | role | p | otype | s | o | d | +|---|---|---|---|---|---|---|---| +| tenant1 | https://example.org/graph1 | G | https://example.org/knows | U | https://example.org/Alice | https://example.org/Bob | https://example.org/graph1 | +| tenant1 | https://example.org/Alice | S | https://example.org/knows | U | https://example.org/Alice | https://example.org/Bob | https://example.org/graph1 | +| tenant1 | https://example.org/knows | P | https://example.org/knows | U | https://example.org/Alice | https://example.org/Bob | https://example.org/graph1 | +| tenant1 | https://example.org/Bob | O | https://example.org/knows | U | https://example.org/Alice | https://example.org/Bob | https://example.org/graph1 | + +Write 1 row to `quads_by_collection`: + +| collection | d | s | p | o | otype | dtype | lang | +|---|---|---|---|---|---|---|---| +| tenant1 | https://example.org/graph1 | https://example.org/Alice | https://example.org/knows | https://example.org/Bob | U | | | + +### Literal Example + +For a label triple: + +``` +Dataset: https://example.org/graph1 +Subject: https://example.org/Alice +Predicate: http://www.w3.org/2000/01/rdf-schema#label +Object: "Alice Smith" (lang: en) +``` + +The `otype` is `'L'`, `dtype` is `'xsd:string'`, and `lang` is `'en'`. The literal value `"Alice Smith"` is stored in `o`. Only 3 rows are needed in `quads_by_entity` — no row is written for the literal as entity, since literals are not independently queryable entities. + +## Query Patterns + +### All 16 DSPO Patterns + +In the table below, "Perfect prefix" means the query uses a contiguous prefix of the clustering columns. "Partition scan + filter" means Cassandra reads a slice of one partition and filters in memory — still efficient, just not a pure prefix match. + +| # | Known | Lookup entity | Clustering prefix | Efficiency | +|---|---|---|---|---| +| 1 | D,S,P,O | entity=S, role='S', p=P | Full match | Perfect prefix | +| 2 | D,S,P,? | entity=S, role='S', p=P | Filter on D | Partition scan + filter | +| 3 | D,S,?,O | entity=S, role='S' | Filter on D, O | Partition scan + filter | +| 4 | D,?,P,O | entity=O, role='O', p=P | Filter on D | Partition scan + filter | +| 5 | ?,S,P,O | entity=S, role='S', p=P | Filter on O | Partition scan + filter | +| 6 | D,S,?,? | entity=S, role='S' | Filter on D | Partition scan + filter | +| 7 | D,?,P,? | entity=P, role='P' | Filter on D | Partition scan + filter | +| 8 | D,?,?,O | entity=O, role='O' | Filter on D | Partition scan + filter | +| 9 | ?,S,P,? | entity=S, role='S', p=P | — | **Perfect prefix** | +| 10 | ?,S,?,O | entity=S, role='S' | Filter on O | Partition scan + filter | +| 11 | ?,?,P,O | entity=O, role='O', p=P | — | **Perfect prefix** | +| 12 | D,?,?,? | entity=D, role='G' | — | **Perfect prefix** | +| 13 | ?,S,?,? | entity=S, role='S' | — | **Perfect prefix** | +| 14 | ?,?,P,? | entity=P, role='P' | — | **Perfect prefix** | +| 15 | ?,?,?,O | entity=O, role='O' | — | **Perfect prefix** | +| 16 | ?,?,?,? | — | Full scan | Exploration only | + +**Key result**: 7 of the 15 non-trivial patterns are perfect clustering prefix hits. The remaining 8 are single-partition reads with in-partition filtering. Every query with at least one known element hits a partition key. + +Pattern 16 (?,?,?,?) does not occur in practice since collection is always specified, reducing it to pattern 12. + +### Common Query Examples + +**Everything about an entity:** + +```sql +SELECT * FROM quads_by_entity +WHERE collection = 'tenant1' AND entity = 'https://example.org/Alice'; +``` + +**All outgoing relationships for an entity:** + +```sql +SELECT * FROM quads_by_entity +WHERE collection = 'tenant1' AND entity = 'https://example.org/Alice' +AND role = 'S'; +``` + +**Specific predicate for an entity:** + +```sql +SELECT * FROM quads_by_entity +WHERE collection = 'tenant1' AND entity = 'https://example.org/Alice' +AND role = 'S' AND p = 'https://example.org/knows'; +``` + +**Label for an entity (specific language):** + +```sql +SELECT * FROM quads_by_entity +WHERE collection = 'tenant1' AND entity = 'https://example.org/Alice' +AND role = 'S' AND p = 'http://www.w3.org/2000/01/rdf-schema#label' +AND otype = 'L'; +``` + +Then filter by `lang = 'en'` application-side if needed. + +**Only URI-valued relationships (entity-to-entity links):** + +```sql +SELECT * FROM quads_by_entity +WHERE collection = 'tenant1' AND entity = 'https://example.org/Alice' +AND role = 'S' AND p = 'https://example.org/knows' AND otype = 'U'; +``` + +**Reverse lookup — what points to this entity:** + +```sql +SELECT * FROM quads_by_entity +WHERE collection = 'tenant1' AND entity = 'https://example.org/Bob' +AND role = 'O'; +``` + +## Label Resolution and Cache Warming + +One of the most significant advantages of the entity-centric model is that **label resolution becomes a free side effect**. + +In the traditional multi-table model, fetching labels requires separate round-trip queries: retrieve triples, identify entity URIs in the results, then fetch `rdfs:label` for each. This N+1 pattern is expensive. + +In the entity-centric model, querying an entity returns **all** its quads — including its labels, types, and other properties. When the application caches query results, labels are pre-warmed before anything asks for them. + +Two usage regimes confirm this works well in practice: + +- **Human-facing queries**: naturally small result sets, labels essential. Entity reads pre-warm the cache. +- **AI/bulk queries**: large result sets with hard limits. Labels either unnecessary or needed only for a curated subset of entities already in cache. + +The theoretical concern of resolving labels for huge result sets (e.g. 30,000 entities) is mitigated by the practical observation that no human or AI consumer usefully processes that many labels. Application-level query limits ensure cache pressure remains manageable. + +## Wide Partitions and Reification + +Reification (RDF-star style statements about statements) creates hub entities — e.g. a source document that supports thousands of extracted facts. This can produce wide partitions. + +Mitigating factors: + +- **Application-level query limits**: all GraphRAG and human-facing queries enforce hard limits, so wide partitions are never fully scanned on the hot read path +- **Cassandra handles partial reads efficiently**: a clustering column scan with an early stop is fast even on large partitions +- **Collection deletion** (the only operation that might traverse full partitions) is an accepted background process + +## Collection Deletion + +Triggered by API call, runs in the background (eventually consistent). + +1. Read `quads_by_collection` for the target collection to get all quads +2. Extract unique entities from the quads (s, p, o, d values) +3. For each unique entity, delete the partition from `quads_by_entity` +4. Delete the rows from `quads_by_collection` + +The `quads_by_collection` table provides the index needed to locate all entity partitions without a full table scan. Partition-level deletes are efficient since `(collection, entity)` is the partition key. + +## Migration Path from Multi-Table Model + +The entity-centric model can coexist with the existing multi-table model during migration: + +1. Deploy `quads_by_entity` and `quads_by_collection` tables alongside existing tables +2. Dual-write new quads to both old and new tables +3. Backfill existing data into the new tables +4. Migrate read paths one query pattern at a time +5. Decommission old tables once all reads are migrated + +## Summary + +| Aspect | Traditional (6-table) | Entity-centric (2-table) | +|---|---|---| +| Tables | 7+ | 2 | +| Writes per quad | 6+ | 5 (4 data + 1 manifest) | +| Label resolution | Separate round trips | Free via cache warming | +| Query patterns | 16 across 6 tables | 16 on 1 table | +| Schema complexity | High | Low | +| Operational overhead | 6 tables to tune/repair | 1 data table | +| Reification support | Additional complexity | Natural fit | +| Object type filtering | Not available | Native (via otype clustering) | + diff --git a/tests/pytest.ini b/tests/pytest.ini index b763299c..b032a9d4 100644 --- a/tests/pytest.ini +++ b/tests/pytest.ini @@ -19,4 +19,5 @@ markers = integration: marks tests as integration tests unit: marks tests as unit tests contract: marks tests as contract tests (service interface validation) - vertexai: marks tests as vertex ai specific tests \ No newline at end of file + vertexai: marks tests as vertex ai specific tests + asyncio: marks tests that use asyncio \ No newline at end of file diff --git a/tests/unit/test_direct/test_entity_centric_kg.py b/tests/unit/test_direct/test_entity_centric_kg.py new file mode 100644 index 00000000..5f64b581 --- /dev/null +++ b/tests/unit/test_direct/test_entity_centric_kg.py @@ -0,0 +1,599 @@ +""" +Unit tests for EntityCentricKnowledgeGraph class + +Tests the entity-centric knowledge graph implementation without requiring +an actual Cassandra connection. Uses mocking to verify correct behavior. +""" + +import pytest +from unittest.mock import MagicMock, patch, call +import os + + +class TestEntityCentricKnowledgeGraph: + """Test cases for EntityCentricKnowledgeGraph""" + + @pytest.fixture + def mock_cluster(self): + """Create a mock Cassandra cluster""" + with patch('trustgraph.direct.cassandra_kg.Cluster') as mock_cluster_cls: + mock_cluster = MagicMock() + mock_session = MagicMock() + mock_cluster.connect.return_value = mock_session + mock_cluster_cls.return_value = mock_cluster + yield mock_cluster_cls, mock_cluster, mock_session + + @pytest.fixture + def entity_kg(self, mock_cluster): + """Create an EntityCentricKnowledgeGraph instance with mocked Cassandra""" + from trustgraph.direct.cassandra_kg import EntityCentricKnowledgeGraph + mock_cluster_cls, mock_cluster, mock_session = mock_cluster + + # Create instance + kg = EntityCentricKnowledgeGraph(hosts=['localhost'], keyspace='test_keyspace') + return kg, mock_session + + def test_init_creates_entity_centric_schema(self, mock_cluster): + """Test that initialization creates the 2-table entity-centric schema""" + from trustgraph.direct.cassandra_kg import EntityCentricKnowledgeGraph + mock_cluster_cls, mock_cluster, mock_session = mock_cluster + + kg = EntityCentricKnowledgeGraph(hosts=['localhost'], keyspace='test_keyspace') + + # Verify schema tables were created + execute_calls = mock_session.execute.call_args_list + executed_statements = [str(c) for c in execute_calls] + + # Check for keyspace creation + keyspace_created = any('create keyspace' in str(c).lower() for c in execute_calls) + assert keyspace_created + + # Check for quads_by_entity table + entity_table_created = any('quads_by_entity' in str(c) for c in execute_calls) + assert entity_table_created + + # Check for quads_by_collection table + collection_table_created = any('quads_by_collection' in str(c) for c in execute_calls) + assert collection_table_created + + # Check for collection_metadata table + metadata_table_created = any('collection_metadata' in str(c) for c in execute_calls) + assert metadata_table_created + + def test_prepare_statements_initialized(self, entity_kg): + """Test that prepared statements are initialized""" + kg, mock_session = entity_kg + + # Verify prepare was called for various statements + assert mock_session.prepare.called + prepare_calls = mock_session.prepare.call_args_list + + # Check that key prepared statements exist + prepared_queries = [str(c) for c in prepare_calls] + + # Insert statements + insert_entity_stmt = any('INSERT INTO' in str(c) and 'quads_by_entity' in str(c) + for c in prepare_calls) + assert insert_entity_stmt + + insert_collection_stmt = any('INSERT INTO' in str(c) and 'quads_by_collection' in str(c) + for c in prepare_calls) + assert insert_collection_stmt + + def test_insert_uri_object_creates_4_entity_rows(self, entity_kg): + """Test that inserting a quad with URI object creates 4 entity rows""" + kg, mock_session = entity_kg + + # Reset mocks to track only insert-related calls + mock_session.reset_mock() + + kg.insert( + collection='test_collection', + s='http://example.org/Alice', + p='http://example.org/knows', + o='http://example.org/Bob', + g='http://example.org/graph1', + otype='u' + ) + + # Verify batch was executed + mock_session.execute.assert_called() + + def test_insert_literal_object_creates_3_entity_rows(self, entity_kg): + """Test that inserting a quad with literal object creates 3 entity rows""" + kg, mock_session = entity_kg + + mock_session.reset_mock() + + kg.insert( + collection='test_collection', + s='http://example.org/Alice', + p='http://www.w3.org/2000/01/rdf-schema#label', + o='Alice Smith', + g=None, + otype='l', + dtype='xsd:string', + lang='en' + ) + + # Verify batch was executed + mock_session.execute.assert_called() + + def test_insert_default_graph(self, entity_kg): + """Test that None graph is stored as empty string""" + kg, mock_session = entity_kg + + mock_session.reset_mock() + + kg.insert( + collection='test_collection', + s='http://example.org/Alice', + p='http://example.org/knows', + o='http://example.org/Bob', + g=None, + otype='u' + ) + + mock_session.execute.assert_called() + + def test_insert_auto_detects_otype(self, entity_kg): + """Test that otype is auto-detected when not provided""" + kg, mock_session = entity_kg + + mock_session.reset_mock() + + # URI should be auto-detected + kg.insert( + collection='test_collection', + s='http://example.org/Alice', + p='http://example.org/knows', + o='http://example.org/Bob' + ) + mock_session.execute.assert_called() + + mock_session.reset_mock() + + # Literal should be auto-detected + kg.insert( + collection='test_collection', + s='http://example.org/Alice', + p='http://example.org/name', + o='Alice' + ) + mock_session.execute.assert_called() + + def test_get_s_returns_quads_for_subject(self, entity_kg): + """Test get_s queries by subject""" + kg, mock_session = entity_kg + + # Mock the query result + mock_result = [ + MagicMock(p='http://example.org/knows', o='http://example.org/Bob', + d='', otype='u', dtype='', lang='', s='http://example.org/Alice') + ] + mock_session.execute.return_value = mock_result + + results = kg.get_s('test_collection', 'http://example.org/Alice') + + # Verify query was executed + mock_session.execute.assert_called() + + # Results should be QuadResult objects + assert len(results) == 1 + assert results[0].s == 'http://example.org/Alice' + assert results[0].p == 'http://example.org/knows' + assert results[0].o == 'http://example.org/Bob' + + def test_get_p_returns_quads_for_predicate(self, entity_kg): + """Test get_p queries by predicate""" + kg, mock_session = entity_kg + + mock_result = [ + MagicMock(s='http://example.org/Alice', o='http://example.org/Bob', + d='', otype='u', dtype='', lang='', p='http://example.org/knows') + ] + mock_session.execute.return_value = mock_result + + results = kg.get_p('test_collection', 'http://example.org/knows') + + mock_session.execute.assert_called() + assert len(results) == 1 + + def test_get_o_returns_quads_for_object(self, entity_kg): + """Test get_o queries by object""" + kg, mock_session = entity_kg + + mock_result = [ + MagicMock(s='http://example.org/Alice', p='http://example.org/knows', + d='', otype='u', dtype='', lang='', o='http://example.org/Bob') + ] + mock_session.execute.return_value = mock_result + + results = kg.get_o('test_collection', 'http://example.org/Bob') + + mock_session.execute.assert_called() + assert len(results) == 1 + + def test_get_sp_returns_quads_for_subject_predicate(self, entity_kg): + """Test get_sp queries by subject and predicate""" + kg, mock_session = entity_kg + + mock_result = [ + MagicMock(o='http://example.org/Bob', d='', otype='u', dtype='', lang='') + ] + mock_session.execute.return_value = mock_result + + results = kg.get_sp('test_collection', 'http://example.org/Alice', + 'http://example.org/knows') + + mock_session.execute.assert_called() + assert len(results) == 1 + + def test_get_po_returns_quads_for_predicate_object(self, entity_kg): + """Test get_po queries by predicate and object""" + kg, mock_session = entity_kg + + mock_result = [ + MagicMock(s='http://example.org/Alice', d='', otype='u', dtype='', lang='', + o='http://example.org/Bob') + ] + mock_session.execute.return_value = mock_result + + results = kg.get_po('test_collection', 'http://example.org/knows', + 'http://example.org/Bob') + + mock_session.execute.assert_called() + assert len(results) == 1 + + def test_get_os_returns_quads_for_object_subject(self, entity_kg): + """Test get_os queries by object and subject""" + kg, mock_session = entity_kg + + mock_result = [ + MagicMock(p='http://example.org/knows', d='', otype='u', dtype='', lang='', + s='http://example.org/Alice', o='http://example.org/Bob') + ] + mock_session.execute.return_value = mock_result + + results = kg.get_os('test_collection', 'http://example.org/Bob', + 'http://example.org/Alice') + + mock_session.execute.assert_called() + assert len(results) == 1 + + def test_get_spo_returns_quads_for_subject_predicate_object(self, entity_kg): + """Test get_spo queries by subject, predicate, and object""" + kg, mock_session = entity_kg + + mock_result = [ + MagicMock(d='', otype='u', dtype='', lang='', + o='http://example.org/Bob') + ] + mock_session.execute.return_value = mock_result + + results = kg.get_spo('test_collection', 'http://example.org/Alice', + 'http://example.org/knows', 'http://example.org/Bob') + + mock_session.execute.assert_called() + assert len(results) == 1 + + def test_get_g_returns_quads_for_graph(self, entity_kg): + """Test get_g queries by graph""" + kg, mock_session = entity_kg + + mock_result = [ + MagicMock(s='http://example.org/Alice', p='http://example.org/knows', + o='http://example.org/Bob', otype='u', dtype='', lang='') + ] + mock_session.execute.return_value = mock_result + + results = kg.get_g('test_collection', 'http://example.org/graph1') + + mock_session.execute.assert_called() + + def test_get_all_returns_all_quads_in_collection(self, entity_kg): + """Test get_all returns all quads""" + kg, mock_session = entity_kg + + mock_result = [ + MagicMock(d='', s='http://example.org/Alice', p='http://example.org/knows', + o='http://example.org/Bob', otype='u', dtype='', lang='') + ] + mock_session.execute.return_value = mock_result + + results = kg.get_all('test_collection') + + mock_session.execute.assert_called() + + def test_graph_wildcard_returns_all_graphs(self, entity_kg): + """Test that g='*' returns quads from all graphs""" + from trustgraph.direct.cassandra_kg import GRAPH_WILDCARD + kg, mock_session = entity_kg + + mock_result = [ + MagicMock(p='http://example.org/knows', d='http://example.org/graph1', + otype='u', dtype='', lang='', s='http://example.org/Alice', + o='http://example.org/Bob'), + MagicMock(p='http://example.org/knows', d='http://example.org/graph2', + otype='u', dtype='', lang='', s='http://example.org/Alice', + o='http://example.org/Charlie') + ] + mock_session.execute.return_value = mock_result + + results = kg.get_s('test_collection', 'http://example.org/Alice', g=GRAPH_WILDCARD) + + # Should return quads from both graphs + assert len(results) == 2 + + def test_specific_graph_filters_results(self, entity_kg): + """Test that specifying a graph filters results""" + kg, mock_session = entity_kg + + mock_result = [ + MagicMock(p='http://example.org/knows', d='http://example.org/graph1', + otype='u', dtype='', lang='', s='http://example.org/Alice', + o='http://example.org/Bob'), + MagicMock(p='http://example.org/knows', d='http://example.org/graph2', + otype='u', dtype='', lang='', s='http://example.org/Alice', + o='http://example.org/Charlie') + ] + mock_session.execute.return_value = mock_result + + results = kg.get_s('test_collection', 'http://example.org/Alice', + g='http://example.org/graph1') + + # Should only return quads from graph1 + assert len(results) == 1 + assert results[0].g == 'http://example.org/graph1' + + def test_collection_exists_returns_true_when_exists(self, entity_kg): + """Test collection_exists returns True for existing collection""" + kg, mock_session = entity_kg + + mock_result = [MagicMock(collection='test_collection')] + mock_session.execute.return_value = mock_result + + exists = kg.collection_exists('test_collection') + + assert exists is True + + def test_collection_exists_returns_false_when_not_exists(self, entity_kg): + """Test collection_exists returns False for non-existing collection""" + kg, mock_session = entity_kg + + mock_session.execute.return_value = [] + + exists = kg.collection_exists('nonexistent_collection') + + assert exists is False + + def test_create_collection_inserts_metadata(self, entity_kg): + """Test create_collection inserts metadata row""" + kg, mock_session = entity_kg + + mock_session.reset_mock() + kg.create_collection('test_collection') + + # Verify INSERT was executed for collection_metadata + mock_session.execute.assert_called() + + def test_delete_collection_removes_all_data(self, entity_kg): + """Test delete_collection removes entity partitions and collection rows""" + kg, mock_session = entity_kg + + # Mock reading quads from collection + mock_quads = [ + MagicMock(d='', s='http://example.org/Alice', p='http://example.org/knows', + o='http://example.org/Bob', otype='u') + ] + mock_session.execute.return_value = mock_quads + + mock_session.reset_mock() + kg.delete_collection('test_collection') + + # Verify delete operations were executed + assert mock_session.execute.called + + def test_close_shuts_down_connections(self, entity_kg): + """Test close shuts down session and cluster""" + kg, mock_session = entity_kg + + kg.close() + + mock_session.shutdown.assert_called_once() + kg.cluster.shutdown.assert_called_once() + + +class TestQuadResult: + """Test cases for QuadResult class""" + + def test_quad_result_stores_all_fields(self): + """Test QuadResult stores all quad fields""" + from trustgraph.direct.cassandra_kg import QuadResult + + result = QuadResult( + s='http://example.org/Alice', + p='http://example.org/knows', + o='http://example.org/Bob', + g='http://example.org/graph1', + otype='u', + dtype='', + lang='' + ) + + assert result.s == 'http://example.org/Alice' + assert result.p == 'http://example.org/knows' + assert result.o == 'http://example.org/Bob' + assert result.g == 'http://example.org/graph1' + assert result.otype == 'u' + assert result.dtype == '' + assert result.lang == '' + + def test_quad_result_defaults(self): + """Test QuadResult default values""" + from trustgraph.direct.cassandra_kg import QuadResult + + result = QuadResult( + s='http://example.org/s', + p='http://example.org/p', + o='literal value', + g='' + ) + + assert result.otype == 'u' # Default otype + assert result.dtype == '' + assert result.lang == '' + + def test_quad_result_with_literal_metadata(self): + """Test QuadResult with literal metadata""" + from trustgraph.direct.cassandra_kg import QuadResult + + result = QuadResult( + s='http://example.org/Alice', + p='http://www.w3.org/2000/01/rdf-schema#label', + o='Alice Smith', + g='', + otype='l', + dtype='xsd:string', + lang='en' + ) + + assert result.otype == 'l' + assert result.dtype == 'xsd:string' + assert result.lang == 'en' + + +class TestWriteHelperFunctions: + """Test cases for helper functions in write.py""" + + def test_get_term_otype_for_iri(self): + """Test get_term_otype returns 'u' for IRI terms""" + from trustgraph.storage.triples.cassandra.write import get_term_otype + from trustgraph.schema import Term, IRI + + term = Term(type=IRI, iri='http://example.org/Alice') + assert get_term_otype(term) == 'u' + + def test_get_term_otype_for_literal(self): + """Test get_term_otype returns 'l' for LITERAL terms""" + from trustgraph.storage.triples.cassandra.write import get_term_otype + from trustgraph.schema import Term, LITERAL + + term = Term(type=LITERAL, value='Alice Smith') + assert get_term_otype(term) == 'l' + + def test_get_term_otype_for_blank(self): + """Test get_term_otype returns 'u' for BLANK terms""" + from trustgraph.storage.triples.cassandra.write import get_term_otype + from trustgraph.schema import Term, BLANK + + term = Term(type=BLANK, id='_:b1') + assert get_term_otype(term) == 'u' + + def test_get_term_otype_for_triple(self): + """Test get_term_otype returns 't' for TRIPLE terms""" + from trustgraph.storage.triples.cassandra.write import get_term_otype + from trustgraph.schema import Term, TRIPLE + + term = Term(type=TRIPLE) + assert get_term_otype(term) == 't' + + def test_get_term_otype_for_none(self): + """Test get_term_otype returns 'u' for None""" + from trustgraph.storage.triples.cassandra.write import get_term_otype + + assert get_term_otype(None) == 'u' + + def test_get_term_dtype_for_literal(self): + """Test get_term_dtype extracts datatype from LITERAL""" + from trustgraph.storage.triples.cassandra.write import get_term_dtype + from trustgraph.schema import Term, LITERAL + + term = Term(type=LITERAL, value='42', datatype='xsd:integer') + assert get_term_dtype(term) == 'xsd:integer' + + def test_get_term_dtype_for_non_literal(self): + """Test get_term_dtype returns empty string for non-LITERAL""" + from trustgraph.storage.triples.cassandra.write import get_term_dtype + from trustgraph.schema import Term, IRI + + term = Term(type=IRI, iri='http://example.org/Alice') + assert get_term_dtype(term) == '' + + def test_get_term_dtype_for_none(self): + """Test get_term_dtype returns empty string for None""" + from trustgraph.storage.triples.cassandra.write import get_term_dtype + + assert get_term_dtype(None) == '' + + def test_get_term_lang_for_literal(self): + """Test get_term_lang extracts language from LITERAL""" + from trustgraph.storage.triples.cassandra.write import get_term_lang + from trustgraph.schema import Term, LITERAL + + term = Term(type=LITERAL, value='Alice Smith', language='en') + assert get_term_lang(term) == 'en' + + def test_get_term_lang_for_non_literal(self): + """Test get_term_lang returns empty string for non-LITERAL""" + from trustgraph.storage.triples.cassandra.write import get_term_lang + from trustgraph.schema import Term, IRI + + term = Term(type=IRI, iri='http://example.org/Alice') + assert get_term_lang(term) == '' + + +class TestServiceHelperFunctions: + """Test cases for helper functions in service.py""" + + def test_create_term_with_uri_otype(self): + """Test create_term creates IRI Term for otype='u'""" + from trustgraph.query.triples.cassandra.service import create_term + from trustgraph.schema import IRI + + term = create_term('http://example.org/Alice', otype='u') + + assert term.type == IRI + assert term.iri == 'http://example.org/Alice' + + def test_create_term_with_literal_otype(self): + """Test create_term creates LITERAL Term for otype='l'""" + from trustgraph.query.triples.cassandra.service import create_term + from trustgraph.schema import LITERAL + + term = create_term('Alice Smith', otype='l', dtype='xsd:string', lang='en') + + assert term.type == LITERAL + assert term.value == 'Alice Smith' + assert term.datatype == 'xsd:string' + assert term.language == 'en' + + def test_create_term_with_triple_otype(self): + """Test create_term creates IRI Term for otype='t'""" + from trustgraph.query.triples.cassandra.service import create_term + from trustgraph.schema import IRI + + term = create_term('http://example.org/statement1', otype='t') + + assert term.type == IRI + assert term.iri == 'http://example.org/statement1' + + def test_create_term_heuristic_fallback_uri(self): + """Test create_term uses URL heuristic when otype not provided""" + from trustgraph.query.triples.cassandra.service import create_term + from trustgraph.schema import IRI + + term = create_term('http://example.org/Alice') + + assert term.type == IRI + assert term.iri == 'http://example.org/Alice' + + def test_create_term_heuristic_fallback_literal(self): + """Test create_term uses literal heuristic when otype not provided""" + from trustgraph.query.triples.cassandra.service import create_term + from trustgraph.schema import LITERAL + + term = create_term('Alice Smith') + + assert term.type == LITERAL + assert term.value == 'Alice Smith' diff --git a/tests/unit/test_query/test_triples_cassandra_query.py b/tests/unit/test_query/test_triples_cassandra_query.py index 0c5dc29c..480f2ee1 100644 --- a/tests/unit/test_query/test_triples_cassandra_query.py +++ b/tests/unit/test_query/test_triples_cassandra_query.py @@ -70,25 +70,29 @@ class TestCassandraQueryProcessor: assert result.type == LITERAL @pytest.mark.asyncio - @patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph') - async def test_query_triples_spo_query(self, mock_trustgraph): + @patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph') + async def test_query_triples_spo_query(self, mock_kg_class): """Test querying triples with subject, predicate, and object specified""" from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL - - # Setup mock TrustGraph + + # Setup mock TrustGraph via factory function mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance + mock_kg_class.return_value = mock_tg_instance # SPO query returns a list of results (with mock graph attribute) mock_result = MagicMock() - mock_result.g = None + mock_result.g = '' + mock_result.otype = None + mock_result.dtype = None + mock_result.lang = None + mock_result.o = 'test_object' mock_tg_instance.get_spo.return_value = [mock_result] - + processor = Processor( taskgroup=MagicMock(), id='test-cassandra-query', cassandra_host='localhost' ) - + # Create query request with all SPO values query = TriplesQueryRequest( user='test_user', @@ -98,20 +102,20 @@ class TestCassandraQueryProcessor: o=Term(type=LITERAL, value='test_object'), limit=100 ) - + result = await processor.query_triples(query) - + # Verify KnowledgeGraph was created with correct parameters - mock_trustgraph.assert_called_once_with( + mock_kg_class.assert_called_once_with( hosts=['localhost'], keyspace='test_user' ) - + # Verify get_spo was called with correct parameters mock_tg_instance.get_spo.assert_called_once_with( 'test_collection', 'test_subject', 'test_predicate', 'test_object', g=None, limit=100 ) - + # Verify result contains the queried triple assert len(result) == 1 assert result[0].s.value == 'test_subject' @@ -146,21 +150,25 @@ class TestCassandraQueryProcessor: assert processor.table is None @pytest.mark.asyncio - @patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph') - async def test_query_triples_sp_pattern(self, mock_trustgraph): + @patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph') + async def test_query_triples_sp_pattern(self, mock_kg_class): """Test SP query pattern (subject and predicate, no object)""" from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL - - # Setup mock TrustGraph and response + + # Setup mock TrustGraph via factory function mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance - + mock_kg_class.return_value = mock_tg_instance + mock_result = MagicMock() mock_result.o = 'result_object' + mock_result.g = '' + mock_result.otype = None + mock_result.dtype = None + mock_result.lang = None mock_tg_instance.get_sp.return_value = [mock_result] - + processor = Processor(taskgroup=MagicMock()) - + query = TriplesQueryRequest( user='test_user', collection='test_collection', @@ -169,9 +177,9 @@ class TestCassandraQueryProcessor: o=None, limit=50 ) - + result = await processor.query_triples(query) - + mock_tg_instance.get_sp.assert_called_once_with('test_collection', 'test_subject', 'test_predicate', g=None, limit=50) assert len(result) == 1 assert result[0].s.value == 'test_subject' @@ -179,21 +187,25 @@ class TestCassandraQueryProcessor: assert result[0].o.value == 'result_object' @pytest.mark.asyncio - @patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph') - async def test_query_triples_s_pattern(self, mock_trustgraph): + @patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph') + async def test_query_triples_s_pattern(self, mock_kg_class): """Test S query pattern (subject only)""" from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL - + mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance - + mock_kg_class.return_value = mock_tg_instance + mock_result = MagicMock() mock_result.p = 'result_predicate' mock_result.o = 'result_object' + mock_result.g = '' + mock_result.otype = None + mock_result.dtype = None + mock_result.lang = None mock_tg_instance.get_s.return_value = [mock_result] - + processor = Processor(taskgroup=MagicMock()) - + query = TriplesQueryRequest( user='test_user', collection='test_collection', @@ -202,9 +214,9 @@ class TestCassandraQueryProcessor: o=None, limit=25 ) - + result = await processor.query_triples(query) - + mock_tg_instance.get_s.assert_called_once_with('test_collection', 'test_subject', g=None, limit=25) assert len(result) == 1 assert result[0].s.value == 'test_subject' @@ -212,21 +224,25 @@ class TestCassandraQueryProcessor: assert result[0].o.value == 'result_object' @pytest.mark.asyncio - @patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph') - async def test_query_triples_p_pattern(self, mock_trustgraph): + @patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph') + async def test_query_triples_p_pattern(self, mock_kg_class): """Test P query pattern (predicate only)""" from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL - + mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance - + mock_kg_class.return_value = mock_tg_instance + mock_result = MagicMock() mock_result.s = 'result_subject' mock_result.o = 'result_object' + mock_result.g = '' + mock_result.otype = None + mock_result.dtype = None + mock_result.lang = None mock_tg_instance.get_p.return_value = [mock_result] - + processor = Processor(taskgroup=MagicMock()) - + query = TriplesQueryRequest( user='test_user', collection='test_collection', @@ -235,9 +251,9 @@ class TestCassandraQueryProcessor: o=None, limit=10 ) - + result = await processor.query_triples(query) - + mock_tg_instance.get_p.assert_called_once_with('test_collection', 'test_predicate', g=None, limit=10) assert len(result) == 1 assert result[0].s.value == 'result_subject' @@ -245,21 +261,25 @@ class TestCassandraQueryProcessor: assert result[0].o.value == 'result_object' @pytest.mark.asyncio - @patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph') - async def test_query_triples_o_pattern(self, mock_trustgraph): + @patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph') + async def test_query_triples_o_pattern(self, mock_kg_class): """Test O query pattern (object only)""" from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL - + mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance - + mock_kg_class.return_value = mock_tg_instance + mock_result = MagicMock() mock_result.s = 'result_subject' mock_result.p = 'result_predicate' + mock_result.g = '' + mock_result.otype = None + mock_result.dtype = None + mock_result.lang = None mock_tg_instance.get_o.return_value = [mock_result] - + processor = Processor(taskgroup=MagicMock()) - + query = TriplesQueryRequest( user='test_user', collection='test_collection', @@ -268,9 +288,9 @@ class TestCassandraQueryProcessor: o=Term(type=LITERAL, value='test_object'), limit=75 ) - + result = await processor.query_triples(query) - + mock_tg_instance.get_o.assert_called_once_with('test_collection', 'test_object', g=None, limit=75) assert len(result) == 1 assert result[0].s.value == 'result_subject' @@ -278,22 +298,26 @@ class TestCassandraQueryProcessor: assert result[0].o.value == 'test_object' @pytest.mark.asyncio - @patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph') - async def test_query_triples_get_all_pattern(self, mock_trustgraph): + @patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph') + async def test_query_triples_get_all_pattern(self, mock_kg_class): """Test query pattern with no constraints (get all)""" from trustgraph.schema import TriplesQueryRequest - + mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance - + mock_kg_class.return_value = mock_tg_instance + mock_result = MagicMock() mock_result.s = 'all_subject' mock_result.p = 'all_predicate' mock_result.o = 'all_object' + mock_result.g = '' + mock_result.otype = None + mock_result.dtype = None + mock_result.lang = None mock_tg_instance.get_all.return_value = [mock_result] - + processor = Processor(taskgroup=MagicMock()) - + query = TriplesQueryRequest( user='test_user', collection='test_collection', @@ -302,9 +326,9 @@ class TestCassandraQueryProcessor: o=None, limit=1000 ) - + result = await processor.query_triples(query) - + mock_tg_instance.get_all.assert_called_once_with('test_collection', limit=1000) assert len(result) == 1 assert result[0].s.value == 'all_subject' @@ -378,16 +402,20 @@ class TestCassandraQueryProcessor: mock_launch.assert_called_once_with(default_ident, '\nTriples query service. Input is a (s, p, o, g) quad pattern, some values may be\nnull. Output is a list of quads.\n') @pytest.mark.asyncio - @patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph') - async def test_query_triples_with_authentication(self, mock_trustgraph): + @patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph') + async def test_query_triples_with_authentication(self, mock_kg_class): """Test querying with username and password authentication""" from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL - + mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance + mock_kg_class.return_value = mock_tg_instance # SPO query returns a list of results mock_result = MagicMock() - mock_result.g = None + mock_result.g = '' + mock_result.otype = None + mock_result.dtype = None + mock_result.lang = None + mock_result.o = 'test_object' mock_tg_instance.get_spo.return_value = [mock_result] processor = Processor( @@ -395,7 +423,7 @@ class TestCassandraQueryProcessor: cassandra_username='authuser', cassandra_password='authpass' ) - + query = TriplesQueryRequest( user='test_user', collection='test_collection', @@ -404,11 +432,11 @@ class TestCassandraQueryProcessor: o=Term(type=LITERAL, value='test_object'), limit=100 ) - + await processor.query_triples(query) - + # Verify KnowledgeGraph was created with authentication - mock_trustgraph.assert_called_once_with( + mock_kg_class.assert_called_once_with( hosts=['cassandra'], # Updated default keyspace='test_user', username='authuser', @@ -416,16 +444,20 @@ class TestCassandraQueryProcessor: ) @pytest.mark.asyncio - @patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph') - async def test_query_triples_table_reuse(self, mock_trustgraph): + @patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph') + async def test_query_triples_table_reuse(self, mock_kg_class): """Test that TrustGraph is reused for same table""" from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL - + mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance + mock_kg_class.return_value = mock_tg_instance # SPO query returns a list of results mock_result = MagicMock() - mock_result.g = None + mock_result.g = '' + mock_result.otype = None + mock_result.dtype = None + mock_result.lang = None + mock_result.o = 'test_object' mock_tg_instance.get_spo.return_value = [mock_result] processor = Processor(taskgroup=MagicMock()) @@ -441,24 +473,35 @@ class TestCassandraQueryProcessor: # First query should create TrustGraph await processor.query_triples(query) - assert mock_trustgraph.call_count == 1 + assert mock_kg_class.call_count == 1 # Second query with same table should reuse TrustGraph await processor.query_triples(query) - assert mock_trustgraph.call_count == 1 # Should not increase + assert mock_kg_class.call_count == 1 # Should not increase @pytest.mark.asyncio - @patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph') - async def test_query_triples_table_switching(self, mock_trustgraph): + @patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph') + async def test_query_triples_table_switching(self, mock_kg_class): """Test table switching creates new TrustGraph""" from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL - + mock_tg_instance1 = MagicMock() mock_tg_instance2 = MagicMock() - mock_trustgraph.side_effect = [mock_tg_instance1, mock_tg_instance2] - + mock_kg_class.side_effect = [mock_tg_instance1, mock_tg_instance2] + + # Setup mock results for both instances + mock_result = MagicMock() + mock_result.g = '' + mock_result.otype = None + mock_result.dtype = None + mock_result.lang = None + mock_result.p = 'p' + mock_result.o = 'o' + mock_tg_instance1.get_s.return_value = [mock_result] + mock_tg_instance2.get_s.return_value = [mock_result] + processor = Processor(taskgroup=MagicMock()) - + # First query query1 = TriplesQueryRequest( user='user1', @@ -468,10 +511,10 @@ class TestCassandraQueryProcessor: o=None, limit=100 ) - + await processor.query_triples(query1) assert processor.table == 'user1' - + # Second query with different table query2 = TriplesQueryRequest( user='user2', @@ -481,25 +524,25 @@ class TestCassandraQueryProcessor: o=None, limit=100 ) - + await processor.query_triples(query2) assert processor.table == 'user2' - + # Verify TrustGraph was created twice - assert mock_trustgraph.call_count == 2 + assert mock_kg_class.call_count == 2 @pytest.mark.asyncio - @patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph') - async def test_query_triples_exception_handling(self, mock_trustgraph): + @patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph') + async def test_query_triples_exception_handling(self, mock_kg_class): """Test exception handling during query execution""" from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL - + mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance + mock_kg_class.return_value = mock_tg_instance mock_tg_instance.get_spo.side_effect = Exception("Query failed") - + processor = Processor(taskgroup=MagicMock()) - + query = TriplesQueryRequest( user='test_user', collection='test_collection', @@ -508,28 +551,36 @@ class TestCassandraQueryProcessor: o=Term(type=LITERAL, value='test_object'), limit=100 ) - + with pytest.raises(Exception, match="Query failed"): await processor.query_triples(query) @pytest.mark.asyncio - @patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph') - async def test_query_triples_multiple_results(self, mock_trustgraph): + @patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph') + async def test_query_triples_multiple_results(self, mock_kg_class): """Test query returning multiple results""" from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL - + mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance - + mock_kg_class.return_value = mock_tg_instance + # Mock multiple results mock_result1 = MagicMock() mock_result1.o = 'object1' + mock_result1.g = '' + mock_result1.otype = None + mock_result1.dtype = None + mock_result1.lang = None mock_result2 = MagicMock() mock_result2.o = 'object2' + mock_result2.g = '' + mock_result2.otype = None + mock_result2.dtype = None + mock_result2.lang = None mock_tg_instance.get_sp.return_value = [mock_result1, mock_result2] - + processor = Processor(taskgroup=MagicMock()) - + query = TriplesQueryRequest( user='test_user', collection='test_collection', @@ -538,9 +589,9 @@ class TestCassandraQueryProcessor: o=None, limit=100 ) - + result = await processor.query_triples(query) - + assert len(result) == 2 assert result[0].o.value == 'object1' assert result[1].o.value == 'object2' @@ -550,16 +601,20 @@ class TestCassandraQueryPerformanceOptimizations: """Test cases for multi-table performance optimizations in query service""" @pytest.mark.asyncio - @patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph') - async def test_get_po_query_optimization(self, mock_trustgraph): + @patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph') + async def test_get_po_query_optimization(self, mock_kg_class): """Test that get_po queries use optimized table (no ALLOW FILTERING)""" from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance + mock_kg_class.return_value = mock_tg_instance mock_result = MagicMock() mock_result.s = 'result_subject' + mock_result.g = '' + mock_result.otype = None + mock_result.dtype = None + mock_result.lang = None mock_tg_instance.get_po.return_value = [mock_result] processor = Processor(taskgroup=MagicMock()) @@ -587,16 +642,20 @@ class TestCassandraQueryPerformanceOptimizations: assert result[0].o.value == 'test_object' @pytest.mark.asyncio - @patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph') - async def test_get_os_query_optimization(self, mock_trustgraph): + @patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph') + async def test_get_os_query_optimization(self, mock_kg_class): """Test that get_os queries use optimized table (no ALLOW FILTERING)""" from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance + mock_kg_class.return_value = mock_tg_instance mock_result = MagicMock() mock_result.p = 'result_predicate' + mock_result.g = '' + mock_result.otype = None + mock_result.dtype = None + mock_result.lang = None mock_tg_instance.get_os.return_value = [mock_result] processor = Processor(taskgroup=MagicMock()) @@ -624,13 +683,13 @@ class TestCassandraQueryPerformanceOptimizations: assert result[0].o.value == 'test_object' @pytest.mark.asyncio - @patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph') - async def test_all_query_patterns_use_correct_tables(self, mock_trustgraph): + @patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph') + async def test_all_query_patterns_use_correct_tables(self, mock_kg_class): """Test that all query patterns route to their optimal tables""" from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance + mock_kg_class.return_value = mock_tg_instance # Mock empty results for all queries mock_tg_instance.get_all.return_value = [] @@ -696,19 +755,23 @@ class TestCassandraQueryPerformanceOptimizations: # Mode is determined in KnowledgeGraph initialization @pytest.mark.asyncio - @patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph') - async def test_performance_critical_po_query_no_filtering(self, mock_trustgraph): + @patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph') + async def test_performance_critical_po_query_no_filtering(self, mock_kg_class): """Test the performance-critical PO query that eliminates ALLOW FILTERING""" from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance + mock_kg_class.return_value = mock_tg_instance # Mock multiple subjects for the same predicate-object pair mock_results = [] for i in range(5): mock_result = MagicMock() mock_result.s = f'subject_{i}' + mock_result.g = '' + mock_result.otype = None + mock_result.dtype = None + mock_result.lang = None mock_results.append(mock_result) mock_tg_instance.get_po.return_value = mock_results diff --git a/tests/unit/test_storage/test_triples_cassandra_storage.py b/tests/unit/test_storage/test_triples_cassandra_storage.py index 3fdff6b9..73272942 100644 --- a/tests/unit/test_storage/test_triples_cassandra_storage.py +++ b/tests/unit/test_storage/test_triples_cassandra_storage.py @@ -6,7 +6,7 @@ import pytest from unittest.mock import MagicMock, patch, AsyncMock from trustgraph.storage.triples.cassandra.write import Processor -from trustgraph.schema import Triple, LITERAL +from trustgraph.schema import Triple, LITERAL, IRI from trustgraph.direct.cassandra_kg import DEFAULT_GRAPH @@ -87,29 +87,29 @@ class TestCassandraStorageProcessor: assert processor.cassandra_username == 'new-user' # Only cassandra_* params work @pytest.mark.asyncio - @patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph') - async def test_table_switching_with_auth(self, mock_trustgraph): + @patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph') + async def test_table_switching_with_auth(self, mock_kg_class): """Test table switching logic when authentication is provided""" taskgroup_mock = MagicMock() mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance - + mock_kg_class.return_value = mock_tg_instance + processor = Processor( taskgroup=taskgroup_mock, cassandra_username='testuser', cassandra_password='testpass' ) - + # Create mock message mock_message = MagicMock() mock_message.metadata.user = 'user1' mock_message.metadata.collection = 'collection1' mock_message.triples = [] - + await processor.store_triples(mock_message) - + # Verify KnowledgeGraph was called with auth parameters - mock_trustgraph.assert_called_once_with( + mock_kg_class.assert_called_once_with( hosts=['cassandra'], # Updated default keyspace='user1', username='testuser', @@ -118,81 +118,89 @@ class TestCassandraStorageProcessor: assert processor.table == 'user1' @pytest.mark.asyncio - @patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph') - async def test_table_switching_without_auth(self, mock_trustgraph): + @patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph') + async def test_table_switching_without_auth(self, mock_kg_class): """Test table switching logic when no authentication is provided""" taskgroup_mock = MagicMock() mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance - + mock_kg_class.return_value = mock_tg_instance + processor = Processor(taskgroup=taskgroup_mock) - + # Create mock message mock_message = MagicMock() mock_message.metadata.user = 'user2' mock_message.metadata.collection = 'collection2' mock_message.triples = [] - + await processor.store_triples(mock_message) - + # Verify KnowledgeGraph was called without auth parameters - mock_trustgraph.assert_called_once_with( + mock_kg_class.assert_called_once_with( hosts=['cassandra'], # Updated default keyspace='user2' ) assert processor.table == 'user2' @pytest.mark.asyncio - @patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph') - async def test_table_reuse_when_same(self, mock_trustgraph): + @patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph') + async def test_table_reuse_when_same(self, mock_kg_class): """Test that TrustGraph is not recreated when table hasn't changed""" taskgroup_mock = MagicMock() mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance - + mock_kg_class.return_value = mock_tg_instance + processor = Processor(taskgroup=taskgroup_mock) - + # Create mock message mock_message = MagicMock() mock_message.metadata.user = 'user1' mock_message.metadata.collection = 'collection1' mock_message.triples = [] - + # First call should create TrustGraph await processor.store_triples(mock_message) - assert mock_trustgraph.call_count == 1 - + assert mock_kg_class.call_count == 1 + # Second call with same table should reuse TrustGraph await processor.store_triples(mock_message) - assert mock_trustgraph.call_count == 1 # Should not increase + assert mock_kg_class.call_count == 1 # Should not increase @pytest.mark.asyncio - @patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph') - async def test_triple_insertion(self, mock_trustgraph): + @patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph') + async def test_triple_insertion(self, mock_kg_class): """Test that triples are properly inserted into Cassandra""" taskgroup_mock = MagicMock() mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance - + mock_kg_class.return_value = mock_tg_instance + processor = Processor(taskgroup=taskgroup_mock) - + # Create mock triples with proper Term structure triple1 = MagicMock() triple1.s.type = LITERAL triple1.s.value = 'subject1' + triple1.s.datatype = '' + triple1.s.language = '' triple1.p.type = LITERAL triple1.p.value = 'predicate1' triple1.o.type = LITERAL triple1.o.value = 'object1' + triple1.o.datatype = '' + triple1.o.language = '' triple1.g = None triple2 = MagicMock() triple2.s.type = LITERAL triple2.s.value = 'subject2' + triple2.s.datatype = '' + triple2.s.language = '' triple2.p.type = LITERAL triple2.p.value = 'predicate2' triple2.o.type = LITERAL triple2.o.value = 'object2' + triple2.o.datatype = '' + triple2.o.language = '' triple2.g = None # Create mock message @@ -203,51 +211,57 @@ class TestCassandraStorageProcessor: await processor.store_triples(mock_message) - # Verify both triples were inserted (with g= parameter) + # Verify both triples were inserted (with g=, otype=, dtype=, lang= parameters) assert mock_tg_instance.insert.call_count == 2 - mock_tg_instance.insert.assert_any_call('collection1', 'subject1', 'predicate1', 'object1', g=DEFAULT_GRAPH) - mock_tg_instance.insert.assert_any_call('collection1', 'subject2', 'predicate2', 'object2', g=DEFAULT_GRAPH) + mock_tg_instance.insert.assert_any_call( + 'collection1', 'subject1', 'predicate1', 'object1', + g=DEFAULT_GRAPH, otype='l', dtype='', lang='' + ) + mock_tg_instance.insert.assert_any_call( + 'collection1', 'subject2', 'predicate2', 'object2', + g=DEFAULT_GRAPH, otype='l', dtype='', lang='' + ) @pytest.mark.asyncio - @patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph') - async def test_triple_insertion_with_empty_list(self, mock_trustgraph): + @patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph') + async def test_triple_insertion_with_empty_list(self, mock_kg_class): """Test behavior when message has no triples""" taskgroup_mock = MagicMock() mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance - + mock_kg_class.return_value = mock_tg_instance + processor = Processor(taskgroup=taskgroup_mock) - + # Create mock message with empty triples mock_message = MagicMock() mock_message.metadata.user = 'user1' mock_message.metadata.collection = 'collection1' mock_message.triples = [] - + await processor.store_triples(mock_message) - + # Verify no triples were inserted mock_tg_instance.insert.assert_not_called() @pytest.mark.asyncio - @patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph') + @patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph') @patch('trustgraph.storage.triples.cassandra.write.time.sleep') - async def test_exception_handling_with_retry(self, mock_sleep, mock_trustgraph): + async def test_exception_handling_with_retry(self, mock_sleep, mock_kg_class): """Test exception handling during TrustGraph creation""" taskgroup_mock = MagicMock() - mock_trustgraph.side_effect = Exception("Connection failed") - + mock_kg_class.side_effect = Exception("Connection failed") + processor = Processor(taskgroup=taskgroup_mock) - + # Create mock message mock_message = MagicMock() mock_message.metadata.user = 'user1' mock_message.metadata.collection = 'collection1' mock_message.triples = [] - + with pytest.raises(Exception, match="Connection failed"): await processor.store_triples(mock_message) - + # Verify sleep was called before re-raising mock_sleep.assert_called_once_with(1) @@ -335,57 +349,61 @@ class TestCassandraStorageProcessor: mock_launch.assert_called_once_with(default_ident, '\nGraph writer. Input is graph edge. Writes edges to Cassandra graph.\n') @pytest.mark.asyncio - @patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph') - async def test_store_triples_table_switching_between_different_tables(self, mock_trustgraph): + @patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph') + async def test_store_triples_table_switching_between_different_tables(self, mock_kg_class): """Test table switching when different tables are used in sequence""" taskgroup_mock = MagicMock() mock_tg_instance1 = MagicMock() mock_tg_instance2 = MagicMock() - mock_trustgraph.side_effect = [mock_tg_instance1, mock_tg_instance2] - + mock_kg_class.side_effect = [mock_tg_instance1, mock_tg_instance2] + processor = Processor(taskgroup=taskgroup_mock) - + # First message with table1 mock_message1 = MagicMock() mock_message1.metadata.user = 'user1' mock_message1.metadata.collection = 'collection1' mock_message1.triples = [] - + await processor.store_triples(mock_message1) assert processor.table == 'user1' assert processor.tg == mock_tg_instance1 - + # Second message with different table mock_message2 = MagicMock() mock_message2.metadata.user = 'user2' mock_message2.metadata.collection = 'collection2' mock_message2.triples = [] - + await processor.store_triples(mock_message2) assert processor.table == 'user2' assert processor.tg == mock_tg_instance2 - + # Verify TrustGraph was created twice for different tables - assert mock_trustgraph.call_count == 2 + assert mock_kg_class.call_count == 2 @pytest.mark.asyncio - @patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph') - async def test_store_triples_with_special_characters_in_values(self, mock_trustgraph): + @patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph') + async def test_store_triples_with_special_characters_in_values(self, mock_kg_class): """Test storing triples with special characters and unicode""" taskgroup_mock = MagicMock() mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance - + mock_kg_class.return_value = mock_tg_instance + processor = Processor(taskgroup=taskgroup_mock) - + # Create triple with special characters and proper Term structure triple = MagicMock() triple.s.type = LITERAL triple.s.value = 'subject with spaces & symbols' + triple.s.datatype = '' + triple.s.language = '' triple.p.type = LITERAL triple.p.value = 'predicate:with/colons' triple.o.type = LITERAL triple.o.value = 'object with "quotes" and unicode: ñáéíóú' + triple.o.datatype = '' + triple.o.language = '' triple.g = None mock_message = MagicMock() @@ -401,31 +419,34 @@ class TestCassandraStorageProcessor: 'subject with spaces & symbols', 'predicate:with/colons', 'object with "quotes" and unicode: ñáéíóú', - g=DEFAULT_GRAPH + g=DEFAULT_GRAPH, + otype='l', + dtype='', + lang='' ) @pytest.mark.asyncio - @patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph') - async def test_store_triples_preserves_old_table_on_exception(self, mock_trustgraph): + @patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph') + async def test_store_triples_preserves_old_table_on_exception(self, mock_kg_class): """Test that table remains unchanged when TrustGraph creation fails""" taskgroup_mock = MagicMock() - + processor = Processor(taskgroup=taskgroup_mock) - + # Set an initial table processor.table = ('old_user', 'old_collection') - + # Mock TrustGraph to raise exception - mock_trustgraph.side_effect = Exception("Connection failed") - + mock_kg_class.side_effect = Exception("Connection failed") + mock_message = MagicMock() mock_message.metadata.user = 'new_user' mock_message.metadata.collection = 'new_collection' mock_message.triples = [] - + with pytest.raises(Exception, match="Connection failed"): await processor.store_triples(mock_message) - + # Table should remain unchanged since self.table = table happens after try/except assert processor.table == ('old_user', 'old_collection') # TrustGraph should be set to None though @@ -436,12 +457,12 @@ class TestCassandraPerformanceOptimizations: """Test cases for multi-table performance optimizations""" @pytest.mark.asyncio - @patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph') - async def test_legacy_mode_uses_single_table(self, mock_trustgraph): + @patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph') + async def test_legacy_mode_uses_single_table(self, mock_kg_class): """Test that legacy mode still works with single table""" taskgroup_mock = MagicMock() mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance + mock_kg_class.return_value = mock_tg_instance with patch.dict('os.environ', {'CASSANDRA_USE_LEGACY': 'true'}): processor = Processor(taskgroup=taskgroup_mock) @@ -454,16 +475,15 @@ class TestCassandraPerformanceOptimizations: await processor.store_triples(mock_message) # Verify KnowledgeGraph instance uses legacy mode - kg_instance = mock_trustgraph.return_value - assert kg_instance is not None + assert mock_tg_instance is not None @pytest.mark.asyncio - @patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph') - async def test_optimized_mode_uses_multi_table(self, mock_trustgraph): + @patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph') + async def test_optimized_mode_uses_multi_table(self, mock_kg_class): """Test that optimized mode uses multi-table schema""" taskgroup_mock = MagicMock() mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance + mock_kg_class.return_value = mock_tg_instance with patch.dict('os.environ', {'CASSANDRA_USE_LEGACY': 'false'}): processor = Processor(taskgroup=taskgroup_mock) @@ -476,16 +496,15 @@ class TestCassandraPerformanceOptimizations: await processor.store_triples(mock_message) # Verify KnowledgeGraph instance is in optimized mode - kg_instance = mock_trustgraph.return_value - assert kg_instance is not None + assert mock_tg_instance is not None @pytest.mark.asyncio - @patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph') - async def test_batch_write_consistency(self, mock_trustgraph): + @patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph') + async def test_batch_write_consistency(self, mock_kg_class): """Test that all tables stay consistent during batch writes""" taskgroup_mock = MagicMock() mock_tg_instance = MagicMock() - mock_trustgraph.return_value = mock_tg_instance + mock_kg_class.return_value = mock_tg_instance processor = Processor(taskgroup=taskgroup_mock) @@ -493,10 +512,14 @@ class TestCassandraPerformanceOptimizations: triple = MagicMock() triple.s.type = LITERAL triple.s.value = 'test_subject' + triple.s.datatype = '' + triple.s.language = '' triple.p.type = LITERAL triple.p.value = 'test_predicate' triple.o.type = LITERAL triple.o.value = 'test_object' + triple.o.datatype = '' + triple.o.language = '' triple.g = None mock_message = MagicMock() @@ -509,7 +532,7 @@ class TestCassandraPerformanceOptimizations: # Verify insert was called for the triple (implementation details tested in KnowledgeGraph) mock_tg_instance.insert.assert_called_once_with( 'collection1', 'test_subject', 'test_predicate', 'test_object', - g=DEFAULT_GRAPH + g=DEFAULT_GRAPH, otype='l', dtype='', lang='' ) def test_environment_variable_controls_mode(self): diff --git a/trustgraph-flow/trustgraph/direct/cassandra_kg.py b/trustgraph-flow/trustgraph/direct/cassandra_kg.py index f8a20041..61639096 100644 --- a/trustgraph-flow/trustgraph/direct/cassandra_kg.py +++ b/trustgraph-flow/trustgraph/direct/cassandra_kg.py @@ -20,6 +20,11 @@ DEFAULT_GRAPH = "" class KnowledgeGraph: """ + REDUNDANT: This 7-table implementation has been superseded by + EntityCentricKnowledgeGraph which uses a more efficient 2-table model. + This class is retained temporarily for reference but should not be used + for new deployments. + Cassandra-backed knowledge graph supporting quads (s, p, o, g). Uses 7 tables to support all 16 query patterns efficiently: @@ -516,3 +521,575 @@ class KnowledgeGraph: self.cluster.shutdown() if self.cluster in _active_clusters: _active_clusters.remove(self.cluster) + + +class EntityCentricKnowledgeGraph: + """ + Entity-centric Cassandra-backed knowledge graph supporting quads (s, p, o, g). + + Uses 2 tables instead of 7: + - quads_by_entity: every entity knows every quad it participates in + - quads_by_collection: manifest for collection-level queries and deletion + + Supports all 16 query patterns with single-partition reads. + """ + + def __init__( + self, hosts=None, + keyspace="trustgraph", username=None, password=None + ): + + if hosts is None: + hosts = ["localhost"] + + self.keyspace = keyspace + self.username = username + + # 2-table entity-centric schema + self.entity_table = "quads_by_entity" + self.collection_table = "quads_by_collection" + + # Collection metadata tracking + self.collection_metadata_table = "collection_metadata" + + if username and password: + ssl_context = SSLContext(PROTOCOL_TLSv1_2) + auth_provider = PlainTextAuthProvider(username=username, password=password) + self.cluster = Cluster(hosts, auth_provider=auth_provider, ssl_context=ssl_context) + else: + self.cluster = Cluster(hosts) + self.session = self.cluster.connect() + + # Track this cluster globally + _active_clusters.append(self.cluster) + + self.init() + self.prepare_statements() + + def clear(self): + self.session.execute(f""" + drop keyspace if exists {self.keyspace}; + """) + self.init() + + def init(self): + self.session.execute(f""" + create keyspace if not exists {self.keyspace} + with replication = {{ + 'class' : 'SimpleStrategy', + 'replication_factor' : 1 + }}; + """) + + self.session.set_keyspace(self.keyspace) + self.init_entity_centric_schema() + + def init_entity_centric_schema(self): + """Initialize 2-table entity-centric schema""" + + # quads_by_entity: primary data table + # Every entity has a partition containing all quads it participates in + self.session.execute(f""" + CREATE TABLE IF NOT EXISTS {self.entity_table} ( + collection text, + entity text, + role text, + p text, + otype text, + s text, + o text, + d text, + dtype text, + lang text, + PRIMARY KEY ((collection, entity), role, p, otype, s, o, d) + ); + """) + + # quads_by_collection: manifest for collection-level queries and deletion + self.session.execute(f""" + CREATE TABLE IF NOT EXISTS {self.collection_table} ( + collection text, + d text, + s text, + p text, + o text, + otype text, + dtype text, + lang text, + PRIMARY KEY (collection, d, s, p, o) + ); + """) + + # Collection metadata tracking + self.session.execute(f""" + CREATE TABLE IF NOT EXISTS {self.collection_metadata_table} ( + collection text, + created_at timestamp, + PRIMARY KEY (collection) + ); + """) + + logger.info("Entity-centric schema initialized (2 tables + metadata)") + + def prepare_statements(self): + """Prepare statements for entity-centric schema""" + + # Insert statement for quads_by_entity + self.insert_entity_stmt = self.session.prepare( + f"INSERT INTO {self.entity_table} " + "(collection, entity, role, p, otype, s, o, d, dtype, lang) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + ) + + # Insert statement for quads_by_collection + self.insert_collection_stmt = self.session.prepare( + f"INSERT INTO {self.collection_table} " + "(collection, d, s, p, o, otype, dtype, lang) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)" + ) + + # Query statements for quads_by_entity + + # Get all quads for an entity (any role) + self.get_entity_all_stmt = self.session.prepare( + f"SELECT role, p, otype, s, o, d, dtype, lang FROM {self.entity_table} " + "WHERE collection = ? AND entity = ? LIMIT ?" + ) + + # Get quads where entity is subject (role='S') + self.get_entity_as_s_stmt = self.session.prepare( + f"SELECT p, otype, s, o, d, dtype, lang FROM {self.entity_table} " + "WHERE collection = ? AND entity = ? AND role = 'S' LIMIT ?" + ) + + # Get quads where entity is subject with specific predicate + self.get_entity_as_s_p_stmt = self.session.prepare( + f"SELECT otype, s, o, d, dtype, lang FROM {self.entity_table} " + "WHERE collection = ? AND entity = ? AND role = 'S' AND p = ? LIMIT ?" + ) + + # Get quads where entity is subject with specific predicate and otype + self.get_entity_as_s_p_otype_stmt = self.session.prepare( + f"SELECT s, o, d, dtype, lang FROM {self.entity_table} " + "WHERE collection = ? AND entity = ? AND role = 'S' AND p = ? AND otype = ? LIMIT ?" + ) + + # Get quads where entity is predicate (role='P') + self.get_entity_as_p_stmt = self.session.prepare( + f"SELECT p, otype, s, o, d, dtype, lang FROM {self.entity_table} " + "WHERE collection = ? AND entity = ? AND role = 'P' LIMIT ?" + ) + + # Get quads where entity is object (role='O') + self.get_entity_as_o_stmt = self.session.prepare( + f"SELECT p, otype, s, o, d, dtype, lang FROM {self.entity_table} " + "WHERE collection = ? AND entity = ? AND role = 'O' LIMIT ?" + ) + + # Get quads where entity is object with specific predicate + self.get_entity_as_o_p_stmt = self.session.prepare( + f"SELECT otype, s, o, d, dtype, lang FROM {self.entity_table} " + "WHERE collection = ? AND entity = ? AND role = 'O' AND p = ? LIMIT ?" + ) + + # Get quads where entity is graph (role='G') + self.get_entity_as_g_stmt = self.session.prepare( + f"SELECT p, otype, s, o, d, dtype, lang FROM {self.entity_table} " + "WHERE collection = ? AND entity = ? AND role = 'G' LIMIT ?" + ) + + # Query statements for quads_by_collection + + # Get all quads in collection + self.get_collection_all_stmt = self.session.prepare( + f"SELECT d, s, p, o, otype, dtype, lang FROM {self.collection_table} " + "WHERE collection = ? LIMIT ?" + ) + + # Get all quads in a specific graph + self.get_collection_by_graph_stmt = self.session.prepare( + f"SELECT s, p, o, otype, dtype, lang FROM {self.collection_table} " + "WHERE collection = ? AND d = ? LIMIT ?" + ) + + # Delete statements + self.delete_entity_partition_stmt = self.session.prepare( + f"DELETE FROM {self.entity_table} WHERE collection = ? AND entity = ?" + ) + + self.delete_collection_row_stmt = self.session.prepare( + f"DELETE FROM {self.collection_table} WHERE collection = ? AND d = ? AND s = ? AND p = ? AND o = ?" + ) + + logger.info("Prepared statements initialized for entity-centric schema") + + def insert(self, collection, s, p, o, g=None, otype=None, dtype="", lang=""): + """ + Insert a quad into entity-centric tables. + + Writes 4 rows to quads_by_entity (one for each entity role) + 1 row to + quads_by_collection. For literals, only 3 entity rows are written since + literals are not independently queryable entities. + + Args: + collection: Collection/tenant scope + s: Subject (string value) + p: Predicate (string value) + o: Object (string value) + g: Graph/dataset (None for default graph) + otype: Object type - 'u' (URI), 'l' (literal), 't' (triple) + Auto-detected from o value if not provided + dtype: XSD datatype (for literals) + lang: Language tag (for literals) + """ + # Default graph stored as empty string + if g is None: + g = DEFAULT_GRAPH + + # Auto-detect otype if not provided (backwards compatibility) + if otype is None: + if o.startswith("http://") or o.startswith("https://"): + otype = "u" + else: + otype = "l" + + batch = BatchStatement() + + # Write row for subject entity (role='S') + batch.add(self.insert_entity_stmt, ( + collection, s, 'S', p, otype, s, o, g, dtype, lang + )) + + # Write row for predicate entity (role='P') + batch.add(self.insert_entity_stmt, ( + collection, p, 'P', p, otype, s, o, g, dtype, lang + )) + + # Write row for object entity (role='O') - only for URIs, not literals + if otype == 'u' or otype == 't': + batch.add(self.insert_entity_stmt, ( + collection, o, 'O', p, otype, s, o, g, dtype, lang + )) + + # Write row for graph entity (role='G') - only for non-default graphs + if g != DEFAULT_GRAPH: + batch.add(self.insert_entity_stmt, ( + collection, g, 'G', p, otype, s, o, g, dtype, lang + )) + + # Write row to quads_by_collection + batch.add(self.insert_collection_stmt, ( + collection, g, s, p, o, otype, dtype, lang + )) + + self.session.execute(batch) + + # ======================================================================== + # Query methods + # g=None means default graph, g="*" means all graphs + # Results include otype, dtype, lang for proper Term reconstruction + # ======================================================================== + + def get_all(self, collection, limit=50): + """Get all quads in collection""" + return self.session.execute(self.get_collection_all_stmt, (collection, limit)) + + def get_s(self, collection, s, g=None, limit=10): + """ + Query by subject. Returns quads where s is the subject. + g=None: default graph, g='*': all graphs + """ + rows = self.session.execute(self.get_entity_as_s_stmt, (collection, s, limit)) + + results = [] + for row in rows: + d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH + # Filter by graph if specified + if g is None or g == DEFAULT_GRAPH: + if d != DEFAULT_GRAPH: + continue + elif g != GRAPH_WILDCARD and d != g: + continue + + results.append(QuadResult( + s=row.s, p=row.p, o=row.o, g=d, + otype=row.otype, dtype=row.dtype, lang=row.lang + )) + + return results + + def get_p(self, collection, p, g=None, limit=10): + """Query by predicate""" + rows = self.session.execute(self.get_entity_as_p_stmt, (collection, p, limit)) + + results = [] + for row in rows: + d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH + if g is None or g == DEFAULT_GRAPH: + if d != DEFAULT_GRAPH: + continue + elif g != GRAPH_WILDCARD and d != g: + continue + + results.append(QuadResult( + s=row.s, p=row.p, o=row.o, g=d, + otype=row.otype, dtype=row.dtype, lang=row.lang + )) + + return results + + def get_o(self, collection, o, g=None, limit=10): + """Query by object""" + rows = self.session.execute(self.get_entity_as_o_stmt, (collection, o, limit)) + + results = [] + for row in rows: + d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH + if g is None or g == DEFAULT_GRAPH: + if d != DEFAULT_GRAPH: + continue + elif g != GRAPH_WILDCARD and d != g: + continue + + results.append(QuadResult( + s=row.s, p=row.p, o=row.o, g=d, + otype=row.otype, dtype=row.dtype, lang=row.lang + )) + + return results + + def get_sp(self, collection, s, p, g=None, limit=10): + """Query by subject and predicate""" + rows = self.session.execute(self.get_entity_as_s_p_stmt, (collection, s, p, limit)) + + results = [] + for row in rows: + d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH + if g is None or g == DEFAULT_GRAPH: + if d != DEFAULT_GRAPH: + continue + elif g != GRAPH_WILDCARD and d != g: + continue + + results.append(QuadResult( + s=s, p=p, o=row.o, g=d, + otype=row.otype, dtype=row.dtype, lang=row.lang + )) + + return results + + def get_po(self, collection, p, o, g=None, limit=10): + """Query by predicate and object""" + rows = self.session.execute(self.get_entity_as_o_p_stmt, (collection, o, p, limit)) + + results = [] + for row in rows: + d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH + if g is None or g == DEFAULT_GRAPH: + if d != DEFAULT_GRAPH: + continue + elif g != GRAPH_WILDCARD and d != g: + continue + + results.append(QuadResult( + s=row.s, p=p, o=o, g=d, + otype=row.otype, dtype=row.dtype, lang=row.lang + )) + + return results + + def get_os(self, collection, o, s, g=None, limit=10): + """Query by object and subject""" + # Use subject partition with role='S', filter by o + rows = self.session.execute(self.get_entity_as_s_stmt, (collection, s, limit)) + + results = [] + for row in rows: + if row.o != o: + continue + + d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH + if g is None or g == DEFAULT_GRAPH: + if d != DEFAULT_GRAPH: + continue + elif g != GRAPH_WILDCARD and d != g: + continue + + results.append(QuadResult( + s=s, p=row.p, o=o, g=d, + otype=row.otype, dtype=row.dtype, lang=row.lang + )) + + return results + + def get_spo(self, collection, s, p, o, g=None, limit=10): + """Query by subject, predicate, object (find which graphs)""" + rows = self.session.execute(self.get_entity_as_s_p_stmt, (collection, s, p, limit)) + + results = [] + for row in rows: + if row.o != o: + continue + + d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH + if g is None or g == DEFAULT_GRAPH: + if d != DEFAULT_GRAPH: + continue + elif g != GRAPH_WILDCARD and d != g: + continue + + results.append(QuadResult( + s=s, p=p, o=o, g=d, + otype=row.otype, dtype=row.dtype, lang=row.lang + )) + + return results + + def get_g(self, collection, g, limit=50): + """Get all quads in a specific graph""" + if g is None: + g = DEFAULT_GRAPH + + return self.session.execute(self.get_collection_by_graph_stmt, (collection, g, limit)) + + # ======================================================================== + # Collection management + # ======================================================================== + + def collection_exists(self, collection): + """Check if collection exists""" + try: + result = self.session.execute( + f"SELECT collection FROM {self.collection_metadata_table} WHERE collection = %s LIMIT 1", + (collection,) + ) + return bool(list(result)) + except Exception as e: + logger.error(f"Error checking collection existence: {e}") + return False + + def create_collection(self, collection): + """Create collection by inserting metadata row""" + try: + import datetime + self.session.execute( + f"INSERT INTO {self.collection_metadata_table} (collection, created_at) VALUES (%s, %s)", + (collection, datetime.datetime.now()) + ) + logger.info(f"Created collection metadata for {collection}") + except Exception as e: + logger.error(f"Error creating collection: {e}") + raise e + + def delete_collection(self, collection): + """ + Delete all quads for a collection from both tables. + + Uses efficient partition-level deletes: + 1. Read quads from quads_by_collection to get all quads + 2. Extract unique entities (s, p, o for URIs, g for non-default) + 3. Delete entire entity partitions + 4. Delete collection rows + """ + # Read all quads from collection table + rows = self.session.execute( + f"SELECT d, s, p, o, otype FROM {self.collection_table} WHERE collection = %s", + (collection,) + ) + + # Collect unique entities and quad data for deletion + entities = set() + quads = [] + + for row in rows: + d, s, p, o, otype = row.d, row.s, row.p, row.o, row.otype + quads.append((d, s, p, o)) + + # Subject and predicate are always entities + entities.add(s) + entities.add(p) + + # Object is an entity only for URIs + if otype == 'u' or otype == 't': + entities.add(o) + + # Graph is an entity for non-default graphs + if d != DEFAULT_GRAPH: + entities.add(d) + + # Delete entity partitions (efficient partition-level deletes) + batch = BatchStatement() + count = 0 + + for entity in entities: + batch.add(self.delete_entity_partition_stmt, (collection, entity)) + count += 1 + + # Execute batch every 50 entities + if count % 50 == 0: + self.session.execute(batch) + batch = BatchStatement() + + # Execute remaining entity deletes + if count % 50 != 0: + self.session.execute(batch) + + # Delete collection rows + batch = BatchStatement() + count = 0 + + for d, s, p, o in quads: + batch.add(self.delete_collection_row_stmt, (collection, d, s, p, o)) + count += 1 + + # Execute batch every 50 quads + if count % 50 == 0: + self.session.execute(batch) + batch = BatchStatement() + + # Execute remaining collection row deletes + if count % 50 != 0: + self.session.execute(batch) + + # Delete collection metadata + self.session.execute( + f"DELETE FROM {self.collection_metadata_table} WHERE collection = %s", + (collection,) + ) + + logger.info(f"Deleted collection {collection}: {len(entities)} entity partitions, {len(quads)} quads") + + def close(self): + """Close connections""" + if hasattr(self, 'session') and self.session: + self.session.shutdown() + if hasattr(self, 'cluster') and self.cluster: + self.cluster.shutdown() + if self.cluster in _active_clusters: + _active_clusters.remove(self.cluster) + + +class QuadResult: + """ + Result object for quad queries, including object type metadata. + + Attributes: + s: Subject value + p: Predicate value + o: Object value + g: Graph/dataset value + otype: Object type - 'u' (URI), 'l' (literal), 't' (triple) + dtype: XSD datatype (for literals) + lang: Language tag (for literals) + """ + + def __init__(self, s, p, o, g, otype='u', dtype='', lang=''): + self.s = s + self.p = p + self.o = o + self.g = g + self.otype = otype + self.dtype = dtype + self.lang = lang + + diff --git a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py index 51bbefa9..996fc860 100755 --- a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py +++ b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py @@ -6,7 +6,9 @@ null. Output is a list of quads. import logging -from .... direct.cassandra_kg import KnowledgeGraph, GRAPH_WILDCARD, DEFAULT_GRAPH +from .... direct.cassandra_kg import ( + EntityCentricKnowledgeGraph, GRAPH_WILDCARD, DEFAULT_GRAPH +) from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error from .... schema import Term, Triple, IRI, LITERAL from .... base import TriplesQueryService @@ -31,8 +33,37 @@ def get_term_value(term): return term.id or term.value -def create_term(value): - """Create a Term from a string value""" +def create_term(value, otype=None, dtype=None, lang=None): + """ + Create a Term from a string value, optionally using type metadata. + + Args: + value: The string value + otype: Object type - 'u' (URI), 'l' (literal), 't' (triple) + dtype: XSD datatype (for literals) + lang: Language tag (for literals) + + If otype is provided, uses it to determine Term type. + Otherwise falls back to URL detection heuristic. + """ + if otype is not None: + if otype == 'u': + return Term(type=IRI, iri=value) + elif otype == 'l': + return Term( + type=LITERAL, + value=value, + datatype=dtype or "", + language=lang or "" + ) + elif otype == 't': + # Triple/reification - treat as IRI for now + return Term(type=IRI, iri=value) + else: + # Unknown otype, fall back to heuristic + pass + + # Heuristic fallback for backwards compatibility if value.startswith("http://") or value.startswith("https://"): return Term(type=IRI, iri=value) else: @@ -74,14 +105,17 @@ class Processor(TriplesQueryService): user = query.user if user != self.table: + # Use factory function to select implementation + KGClass = EntityCentricKnowledgeGraph + if self.cassandra_username and self.cassandra_password: - self.tg = KnowledgeGraph( + self.tg = KGClass( hosts=self.cassandra_host, keyspace=query.user, username=self.cassandra_username, password=self.cassandra_password ) else: - self.tg = KnowledgeGraph( + self.tg = KGClass( hosts=self.cassandra_host, keyspace=query.user, ) @@ -93,6 +127,14 @@ class Processor(TriplesQueryService): o_val = get_term_value(query.o) g_val = query.g # Already a string or None + # Helper to extract object metadata from result row + def get_o_metadata(t): + """Extract otype/dtype/lang from result row if available""" + otype = getattr(t, 'otype', None) + dtype = getattr(t, 'dtype', None) + lang = getattr(t, 'lang', None) + return otype, dtype, lang + quads = [] # Route to appropriate query method based on which fields are specified @@ -106,7 +148,8 @@ class Processor(TriplesQueryService): ) for t in resp: g = t.g if hasattr(t, 'g') else DEFAULT_GRAPH - quads.append((s_val, p_val, o_val, g)) + otype, dtype, lang = get_o_metadata(t) + quads.append((s_val, p_val, o_val, g, otype, dtype, lang)) else: # SP specified resp = self.tg.get_sp( @@ -115,7 +158,8 @@ class Processor(TriplesQueryService): ) for t in resp: g = t.g if hasattr(t, 'g') else DEFAULT_GRAPH - quads.append((s_val, p_val, t.o, g)) + otype, dtype, lang = get_o_metadata(t) + quads.append((s_val, p_val, t.o, g, otype, dtype, lang)) else: if o_val is not None: # SO specified @@ -125,7 +169,8 @@ class Processor(TriplesQueryService): ) for t in resp: g = t.g if hasattr(t, 'g') else DEFAULT_GRAPH - quads.append((s_val, t.p, o_val, g)) + otype, dtype, lang = get_o_metadata(t) + quads.append((s_val, t.p, o_val, g, otype, dtype, lang)) else: # S only resp = self.tg.get_s( @@ -134,7 +179,8 @@ class Processor(TriplesQueryService): ) for t in resp: g = t.g if hasattr(t, 'g') else DEFAULT_GRAPH - quads.append((s_val, t.p, t.o, g)) + otype, dtype, lang = get_o_metadata(t) + quads.append((s_val, t.p, t.o, g, otype, dtype, lang)) else: if p_val is not None: if o_val is not None: @@ -145,7 +191,8 @@ class Processor(TriplesQueryService): ) for t in resp: g = t.g if hasattr(t, 'g') else DEFAULT_GRAPH - quads.append((t.s, p_val, o_val, g)) + otype, dtype, lang = get_o_metadata(t) + quads.append((t.s, p_val, o_val, g, otype, dtype, lang)) else: # P only resp = self.tg.get_p( @@ -154,7 +201,8 @@ class Processor(TriplesQueryService): ) for t in resp: g = t.g if hasattr(t, 'g') else DEFAULT_GRAPH - quads.append((t.s, p_val, t.o, g)) + otype, dtype, lang = get_o_metadata(t) + quads.append((t.s, p_val, t.o, g, otype, dtype, lang)) else: if o_val is not None: # O only @@ -164,7 +212,8 @@ class Processor(TriplesQueryService): ) for t in resp: g = t.g if hasattr(t, 'g') else DEFAULT_GRAPH - quads.append((t.s, t.p, o_val, g)) + otype, dtype, lang = get_o_metadata(t) + quads.append((t.s, t.p, o_val, g, otype, dtype, lang)) else: # Nothing specified - get all resp = self.tg.get_all( @@ -173,14 +222,16 @@ class Processor(TriplesQueryService): ) for t in resp: g = t.g if hasattr(t, 'g') else DEFAULT_GRAPH - quads.append((t.s, t.p, t.o, g)) + otype, dtype, lang = get_o_metadata(t) + quads.append((t.s, t.p, t.o, g, otype, dtype, lang)) # Convert to Triple objects (with g field) + # Use otype/dtype/lang for proper Term reconstruction if available triples = [ Triple( s=create_term(q[0]), p=create_term(q[1]), - o=create_term(q[2]), + o=create_term(q[2], otype=q[4], dtype=q[5], lang=q[6]), g=q[3] if q[3] != DEFAULT_GRAPH else None ) for q in quads diff --git a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py index 6ed93b7d..5bc842de 100755 --- a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py @@ -10,12 +10,14 @@ import argparse import time import logging -from .... direct.cassandra_kg import KnowledgeGraph, DEFAULT_GRAPH +from .... direct.cassandra_kg import ( + EntityCentricKnowledgeGraph, DEFAULT_GRAPH +) from .... base import TriplesStoreService, CollectionConfigHandler from .... base import AsyncProcessor, Consumer, Producer from .... base import ConsumerMetrics, ProducerMetrics from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config -from .... schema import IRI, LITERAL +from .... schema import IRI, LITERAL, BLANK, TRIPLE # Module logger logger = logging.getLogger(__name__) @@ -36,6 +38,46 @@ def get_term_value(term): return term.id or term.value +def get_term_otype(term): + """ + Get object type code from a Term for entity-centric storage. + + Maps Term.type to otype: + - IRI ("i") → "u" (URI) + - BLANK ("b") → "u" (treated as URI) + - LITERAL ("l") → "l" (Literal) + - TRIPLE ("t") → "t" (Triple/reification) + """ + if term is None: + return "u" + if term.type == IRI or term.type == BLANK: + return "u" + elif term.type == LITERAL: + return "l" + elif term.type == TRIPLE: + return "t" + else: + return "u" + + +def get_term_dtype(term): + """Extract datatype from a Term (for literals)""" + if term is None: + return "" + if term.type == LITERAL: + return term.datatype or "" + return "" + + +def get_term_lang(term): + """Extract language tag from a Term (for literals)""" + if term is None: + return "" + if term.type == LITERAL: + return term.language or "" + return "" + + class Processor(CollectionConfigHandler, TriplesStoreService): def __init__(self, **params): @@ -78,15 +120,18 @@ class Processor(CollectionConfigHandler, TriplesStoreService): self.tg = None + # Use factory function to select implementation + KGClass = EntityCentricKnowledgeGraph + try: if self.cassandra_username and self.cassandra_password: - self.tg = KnowledgeGraph( + self.tg = KGClass( hosts=self.cassandra_host, keyspace=message.metadata.user, username=self.cassandra_username, password=self.cassandra_password ) else: - self.tg = KnowledgeGraph( + self.tg = KGClass( hosts=self.cassandra_host, keyspace=message.metadata.user, ) @@ -105,12 +150,20 @@ class Processor(CollectionConfigHandler, TriplesStoreService): # t.g is None for default graph, or a graph IRI g_val = t.g if t.g is not None else DEFAULT_GRAPH + # Extract object type metadata for entity-centric storage + otype = get_term_otype(t.o) + dtype = get_term_dtype(t.o) + lang = get_term_lang(t.o) + self.tg.insert( message.metadata.collection, s_val, p_val, o_val, - g=g_val + g=g_val, + otype=otype, + dtype=dtype, + lang=lang ) async def create_collection(self, user: str, collection: str, metadata: dict): @@ -120,16 +173,19 @@ class Processor(CollectionConfigHandler, TriplesStoreService): if self.table is None or self.table != user: self.tg = None + # Use factory function to select implementation + KGClass = EntityCentricKnowledgeGraph + try: if self.cassandra_username and self.cassandra_password: - self.tg = KnowledgeGraph( + self.tg = KGClass( hosts=self.cassandra_host, keyspace=user, username=self.cassandra_username, password=self.cassandra_password ) else: - self.tg = KnowledgeGraph( + self.tg = KGClass( hosts=self.cassandra_host, keyspace=user, ) @@ -159,16 +215,19 @@ class Processor(CollectionConfigHandler, TriplesStoreService): if self.table is None or self.table != user: self.tg = None + # Use factory function to select implementation + KGClass = EntityCentricKnowledgeGraph + try: if self.cassandra_username and self.cassandra_password: - self.tg = KnowledgeGraph( + self.tg = KGClass( hosts=self.cassandra_host, keyspace=user, username=self.cassandra_username, password=self.cassandra_password ) else: - self.tg = KnowledgeGraph( + self.tg = KGClass( hosts=self.cassandra_host, keyspace=user, )