Entity-centric graph (#633)

* Tech spec for new entity-centric graph schema

* Graph implementation
This commit is contained in:
cybermaggedon 2026-02-16 13:26:43 +00:00 committed by GitHub
parent f24f1ebd80
commit 00c1ca681b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 1858 additions and 225 deletions

View file

@ -0,0 +1,260 @@
# Entity-Centric Knowledge Graph Storage on Cassandra
## Overview
This document describes a storage model for RDF-style knowledge graphs on Apache Cassandra. The model uses an **entity-centric** approach where every entity knows every quad it participates in and the role it plays. This replaces a traditional multi-table SPO permutation approach with just two tables.
## Background and Motivation
### The Traditional Approach
A standard RDF quad store on Cassandra requires multiple denormalised tables to cover query patterns — typically 6 or more tables representing different permutations of Subject, Predicate, Object, and Dataset (SPOD). Each quad is written to every table, resulting in significant write amplification, operational overhead, and schema complexity.
Additionally, label resolution (fetching human-readable names for entities) requires separate round-trip queries, which is particularly costly in AI and GraphRAG use cases where labels are essential for LLM context.
### The Entity-Centric Insight
Every quad `(D, S, P, O)` involves up to 4 entities. By writing a row for each entity's participation in the quad, we guarantee that **any query with at least one known element will hit a partition key**. This covers all 16 query patterns with a single data table.
Key benefits:
- **2 tables** instead of 7+
- **4 writes per quad** instead of 6+
- **Label resolution for free** — an entity's labels are co-located with its relationships, naturally warming the application cache
- **All 16 query patterns** served by single-partition reads
- **Simpler operations** — one data table to tune, compact, and repair
## Schema
### Table 1: quads_by_entity
The primary data table. Every entity has a partition containing all quads it participates in. Named to reflect the query pattern (lookup by entity).
```sql
CREATE TABLE quads_by_entity (
collection text, -- Collection/tenant scope (always specified)
entity text, -- The entity this row is about
role text, -- 'S', 'P', 'O', 'G' — how this entity participates
p text, -- Predicate of the quad
otype text, -- 'U' (URI), 'L' (literal), 'T' (triple/reification)
s text, -- Subject of the quad
o text, -- Object of the quad
d text, -- Dataset/graph of the quad
dtype text, -- XSD datatype (when otype = 'L'), e.g. 'xsd:string'
lang text, -- Language tag (when otype = 'L'), e.g. 'en', 'fr'
PRIMARY KEY ((collection, entity), role, p, otype, s, o, d)
);
```
**Partition key**: `(collection, entity)` — scoped to collection, one partition per entity.
**Clustering column order rationale**:
1. **role** — most queries start with "where is this entity a subject/object"
2. **p** — next most common filter, "give me all `knows` relationships"
3. **otype** — enables filtering by URI-valued vs literal-valued relationships
4. **s, o, d** — remaining columns for uniqueness
### Table 2: quads_by_collection
Supports collection-level queries and deletion. Provides a manifest of all quads belonging to a collection. Named to reflect the query pattern (lookup by collection).
```sql
CREATE TABLE quads_by_collection (
collection text,
d text, -- Dataset/graph of the quad
s text, -- Subject of the quad
p text, -- Predicate of the quad
o text, -- Object of the quad
otype text, -- 'U' (URI), 'L' (literal), 'T' (triple/reification)
dtype text, -- XSD datatype (when otype = 'L')
lang text, -- Language tag (when otype = 'L')
PRIMARY KEY (collection, d, s, p, o)
);
```
Clustered by dataset first, enabling deletion at either collection or dataset granularity.
## Write Path
For each incoming quad `(D, S, P, O)` within a collection `C`, write **4 rows** to `quads_by_entity` and **1 row** to `quads_by_collection`.
### Example
Given the quad in collection `tenant1`:
```
Dataset: https://example.org/graph1
Subject: https://example.org/Alice
Predicate: https://example.org/knows
Object: https://example.org/Bob
```
Write 4 rows to `quads_by_entity`:
| collection | entity | role | p | otype | s | o | d |
|---|---|---|---|---|---|---|---|
| tenant1 | https://example.org/graph1 | G | https://example.org/knows | U | https://example.org/Alice | https://example.org/Bob | https://example.org/graph1 |
| tenant1 | https://example.org/Alice | S | https://example.org/knows | U | https://example.org/Alice | https://example.org/Bob | https://example.org/graph1 |
| tenant1 | https://example.org/knows | P | https://example.org/knows | U | https://example.org/Alice | https://example.org/Bob | https://example.org/graph1 |
| tenant1 | https://example.org/Bob | O | https://example.org/knows | U | https://example.org/Alice | https://example.org/Bob | https://example.org/graph1 |
Write 1 row to `quads_by_collection`:
| collection | d | s | p | o | otype | dtype | lang |
|---|---|---|---|---|---|---|---|
| tenant1 | https://example.org/graph1 | https://example.org/Alice | https://example.org/knows | https://example.org/Bob | U | | |
### Literal Example
For a label triple:
```
Dataset: https://example.org/graph1
Subject: https://example.org/Alice
Predicate: http://www.w3.org/2000/01/rdf-schema#label
Object: "Alice Smith" (lang: en)
```
The `otype` is `'L'`, `dtype` is `'xsd:string'`, and `lang` is `'en'`. The literal value `"Alice Smith"` is stored in `o`. Only 3 rows are needed in `quads_by_entity` — no row is written for the literal as entity, since literals are not independently queryable entities.
## Query Patterns
### All 16 DSPO Patterns
In the table below, "Perfect prefix" means the query uses a contiguous prefix of the clustering columns. "Partition scan + filter" means Cassandra reads a slice of one partition and filters in memory — still efficient, just not a pure prefix match.
| # | Known | Lookup entity | Clustering prefix | Efficiency |
|---|---|---|---|---|
| 1 | D,S,P,O | entity=S, role='S', p=P | Full match | Perfect prefix |
| 2 | D,S,P,? | entity=S, role='S', p=P | Filter on D | Partition scan + filter |
| 3 | D,S,?,O | entity=S, role='S' | Filter on D, O | Partition scan + filter |
| 4 | D,?,P,O | entity=O, role='O', p=P | Filter on D | Partition scan + filter |
| 5 | ?,S,P,O | entity=S, role='S', p=P | Filter on O | Partition scan + filter |
| 6 | D,S,?,? | entity=S, role='S' | Filter on D | Partition scan + filter |
| 7 | D,?,P,? | entity=P, role='P' | Filter on D | Partition scan + filter |
| 8 | D,?,?,O | entity=O, role='O' | Filter on D | Partition scan + filter |
| 9 | ?,S,P,? | entity=S, role='S', p=P | — | **Perfect prefix** |
| 10 | ?,S,?,O | entity=S, role='S' | Filter on O | Partition scan + filter |
| 11 | ?,?,P,O | entity=O, role='O', p=P | — | **Perfect prefix** |
| 12 | D,?,?,? | entity=D, role='G' | — | **Perfect prefix** |
| 13 | ?,S,?,? | entity=S, role='S' | — | **Perfect prefix** |
| 14 | ?,?,P,? | entity=P, role='P' | — | **Perfect prefix** |
| 15 | ?,?,?,O | entity=O, role='O' | — | **Perfect prefix** |
| 16 | ?,?,?,? | — | Full scan | Exploration only |
**Key result**: 7 of the 15 non-trivial patterns are perfect clustering prefix hits. The remaining 8 are single-partition reads with in-partition filtering. Every query with at least one known element hits a partition key.
Pattern 16 (?,?,?,?) does not occur in practice since collection is always specified, reducing it to pattern 12.
### Common Query Examples
**Everything about an entity:**
```sql
SELECT * FROM quads_by_entity
WHERE collection = 'tenant1' AND entity = 'https://example.org/Alice';
```
**All outgoing relationships for an entity:**
```sql
SELECT * FROM quads_by_entity
WHERE collection = 'tenant1' AND entity = 'https://example.org/Alice'
AND role = 'S';
```
**Specific predicate for an entity:**
```sql
SELECT * FROM quads_by_entity
WHERE collection = 'tenant1' AND entity = 'https://example.org/Alice'
AND role = 'S' AND p = 'https://example.org/knows';
```
**Label for an entity (specific language):**
```sql
SELECT * FROM quads_by_entity
WHERE collection = 'tenant1' AND entity = 'https://example.org/Alice'
AND role = 'S' AND p = 'http://www.w3.org/2000/01/rdf-schema#label'
AND otype = 'L';
```
Then filter by `lang = 'en'` application-side if needed.
**Only URI-valued relationships (entity-to-entity links):**
```sql
SELECT * FROM quads_by_entity
WHERE collection = 'tenant1' AND entity = 'https://example.org/Alice'
AND role = 'S' AND p = 'https://example.org/knows' AND otype = 'U';
```
**Reverse lookup — what points to this entity:**
```sql
SELECT * FROM quads_by_entity
WHERE collection = 'tenant1' AND entity = 'https://example.org/Bob'
AND role = 'O';
```
## Label Resolution and Cache Warming
One of the most significant advantages of the entity-centric model is that **label resolution becomes a free side effect**.
In the traditional multi-table model, fetching labels requires separate round-trip queries: retrieve triples, identify entity URIs in the results, then fetch `rdfs:label` for each. This N+1 pattern is expensive.
In the entity-centric model, querying an entity returns **all** its quads — including its labels, types, and other properties. When the application caches query results, labels are pre-warmed before anything asks for them.
Two usage regimes confirm this works well in practice:
- **Human-facing queries**: naturally small result sets, labels essential. Entity reads pre-warm the cache.
- **AI/bulk queries**: large result sets with hard limits. Labels either unnecessary or needed only for a curated subset of entities already in cache.
The theoretical concern of resolving labels for huge result sets (e.g. 30,000 entities) is mitigated by the practical observation that no human or AI consumer usefully processes that many labels. Application-level query limits ensure cache pressure remains manageable.
## Wide Partitions and Reification
Reification (RDF-star style statements about statements) creates hub entities — e.g. a source document that supports thousands of extracted facts. This can produce wide partitions.
Mitigating factors:
- **Application-level query limits**: all GraphRAG and human-facing queries enforce hard limits, so wide partitions are never fully scanned on the hot read path
- **Cassandra handles partial reads efficiently**: a clustering column scan with an early stop is fast even on large partitions
- **Collection deletion** (the only operation that might traverse full partitions) is an accepted background process
## Collection Deletion
Triggered by API call, runs in the background (eventually consistent).
1. Read `quads_by_collection` for the target collection to get all quads
2. Extract unique entities from the quads (s, p, o, d values)
3. For each unique entity, delete the partition from `quads_by_entity`
4. Delete the rows from `quads_by_collection`
The `quads_by_collection` table provides the index needed to locate all entity partitions without a full table scan. Partition-level deletes are efficient since `(collection, entity)` is the partition key.
## Migration Path from Multi-Table Model
The entity-centric model can coexist with the existing multi-table model during migration:
1. Deploy `quads_by_entity` and `quads_by_collection` tables alongside existing tables
2. Dual-write new quads to both old and new tables
3. Backfill existing data into the new tables
4. Migrate read paths one query pattern at a time
5. Decommission old tables once all reads are migrated
## Summary
| Aspect | Traditional (6-table) | Entity-centric (2-table) |
|---|---|---|
| Tables | 7+ | 2 |
| Writes per quad | 6+ | 5 (4 data + 1 manifest) |
| Label resolution | Separate round trips | Free via cache warming |
| Query patterns | 16 across 6 tables | 16 on 1 table |
| Schema complexity | High | Low |
| Operational overhead | 6 tables to tune/repair | 1 data table |
| Reification support | Additional complexity | Natural fit |
| Object type filtering | Not available | Native (via otype clustering) |

View file

@ -20,3 +20,4 @@ markers =
unit: marks tests as unit tests
contract: marks tests as contract tests (service interface validation)
vertexai: marks tests as vertex ai specific tests
asyncio: marks tests that use asyncio

View file

@ -0,0 +1,599 @@
"""
Unit tests for EntityCentricKnowledgeGraph class
Tests the entity-centric knowledge graph implementation without requiring
an actual Cassandra connection. Uses mocking to verify correct behavior.
"""
import pytest
from unittest.mock import MagicMock, patch, call
import os
class TestEntityCentricKnowledgeGraph:
"""Test cases for EntityCentricKnowledgeGraph"""
@pytest.fixture
def mock_cluster(self):
"""Create a mock Cassandra cluster"""
with patch('trustgraph.direct.cassandra_kg.Cluster') as mock_cluster_cls:
mock_cluster = MagicMock()
mock_session = MagicMock()
mock_cluster.connect.return_value = mock_session
mock_cluster_cls.return_value = mock_cluster
yield mock_cluster_cls, mock_cluster, mock_session
@pytest.fixture
def entity_kg(self, mock_cluster):
"""Create an EntityCentricKnowledgeGraph instance with mocked Cassandra"""
from trustgraph.direct.cassandra_kg import EntityCentricKnowledgeGraph
mock_cluster_cls, mock_cluster, mock_session = mock_cluster
# Create instance
kg = EntityCentricKnowledgeGraph(hosts=['localhost'], keyspace='test_keyspace')
return kg, mock_session
def test_init_creates_entity_centric_schema(self, mock_cluster):
"""Test that initialization creates the 2-table entity-centric schema"""
from trustgraph.direct.cassandra_kg import EntityCentricKnowledgeGraph
mock_cluster_cls, mock_cluster, mock_session = mock_cluster
kg = EntityCentricKnowledgeGraph(hosts=['localhost'], keyspace='test_keyspace')
# Verify schema tables were created
execute_calls = mock_session.execute.call_args_list
executed_statements = [str(c) for c in execute_calls]
# Check for keyspace creation
keyspace_created = any('create keyspace' in str(c).lower() for c in execute_calls)
assert keyspace_created
# Check for quads_by_entity table
entity_table_created = any('quads_by_entity' in str(c) for c in execute_calls)
assert entity_table_created
# Check for quads_by_collection table
collection_table_created = any('quads_by_collection' in str(c) for c in execute_calls)
assert collection_table_created
# Check for collection_metadata table
metadata_table_created = any('collection_metadata' in str(c) for c in execute_calls)
assert metadata_table_created
def test_prepare_statements_initialized(self, entity_kg):
"""Test that prepared statements are initialized"""
kg, mock_session = entity_kg
# Verify prepare was called for various statements
assert mock_session.prepare.called
prepare_calls = mock_session.prepare.call_args_list
# Check that key prepared statements exist
prepared_queries = [str(c) for c in prepare_calls]
# Insert statements
insert_entity_stmt = any('INSERT INTO' in str(c) and 'quads_by_entity' in str(c)
for c in prepare_calls)
assert insert_entity_stmt
insert_collection_stmt = any('INSERT INTO' in str(c) and 'quads_by_collection' in str(c)
for c in prepare_calls)
assert insert_collection_stmt
def test_insert_uri_object_creates_4_entity_rows(self, entity_kg):
"""Test that inserting a quad with URI object creates 4 entity rows"""
kg, mock_session = entity_kg
# Reset mocks to track only insert-related calls
mock_session.reset_mock()
kg.insert(
collection='test_collection',
s='http://example.org/Alice',
p='http://example.org/knows',
o='http://example.org/Bob',
g='http://example.org/graph1',
otype='u'
)
# Verify batch was executed
mock_session.execute.assert_called()
def test_insert_literal_object_creates_3_entity_rows(self, entity_kg):
"""Test that inserting a quad with literal object creates 3 entity rows"""
kg, mock_session = entity_kg
mock_session.reset_mock()
kg.insert(
collection='test_collection',
s='http://example.org/Alice',
p='http://www.w3.org/2000/01/rdf-schema#label',
o='Alice Smith',
g=None,
otype='l',
dtype='xsd:string',
lang='en'
)
# Verify batch was executed
mock_session.execute.assert_called()
def test_insert_default_graph(self, entity_kg):
"""Test that None graph is stored as empty string"""
kg, mock_session = entity_kg
mock_session.reset_mock()
kg.insert(
collection='test_collection',
s='http://example.org/Alice',
p='http://example.org/knows',
o='http://example.org/Bob',
g=None,
otype='u'
)
mock_session.execute.assert_called()
def test_insert_auto_detects_otype(self, entity_kg):
"""Test that otype is auto-detected when not provided"""
kg, mock_session = entity_kg
mock_session.reset_mock()
# URI should be auto-detected
kg.insert(
collection='test_collection',
s='http://example.org/Alice',
p='http://example.org/knows',
o='http://example.org/Bob'
)
mock_session.execute.assert_called()
mock_session.reset_mock()
# Literal should be auto-detected
kg.insert(
collection='test_collection',
s='http://example.org/Alice',
p='http://example.org/name',
o='Alice'
)
mock_session.execute.assert_called()
def test_get_s_returns_quads_for_subject(self, entity_kg):
"""Test get_s queries by subject"""
kg, mock_session = entity_kg
# Mock the query result
mock_result = [
MagicMock(p='http://example.org/knows', o='http://example.org/Bob',
d='', otype='u', dtype='', lang='', s='http://example.org/Alice')
]
mock_session.execute.return_value = mock_result
results = kg.get_s('test_collection', 'http://example.org/Alice')
# Verify query was executed
mock_session.execute.assert_called()
# Results should be QuadResult objects
assert len(results) == 1
assert results[0].s == 'http://example.org/Alice'
assert results[0].p == 'http://example.org/knows'
assert results[0].o == 'http://example.org/Bob'
def test_get_p_returns_quads_for_predicate(self, entity_kg):
"""Test get_p queries by predicate"""
kg, mock_session = entity_kg
mock_result = [
MagicMock(s='http://example.org/Alice', o='http://example.org/Bob',
d='', otype='u', dtype='', lang='', p='http://example.org/knows')
]
mock_session.execute.return_value = mock_result
results = kg.get_p('test_collection', 'http://example.org/knows')
mock_session.execute.assert_called()
assert len(results) == 1
def test_get_o_returns_quads_for_object(self, entity_kg):
"""Test get_o queries by object"""
kg, mock_session = entity_kg
mock_result = [
MagicMock(s='http://example.org/Alice', p='http://example.org/knows',
d='', otype='u', dtype='', lang='', o='http://example.org/Bob')
]
mock_session.execute.return_value = mock_result
results = kg.get_o('test_collection', 'http://example.org/Bob')
mock_session.execute.assert_called()
assert len(results) == 1
def test_get_sp_returns_quads_for_subject_predicate(self, entity_kg):
"""Test get_sp queries by subject and predicate"""
kg, mock_session = entity_kg
mock_result = [
MagicMock(o='http://example.org/Bob', d='', otype='u', dtype='', lang='')
]
mock_session.execute.return_value = mock_result
results = kg.get_sp('test_collection', 'http://example.org/Alice',
'http://example.org/knows')
mock_session.execute.assert_called()
assert len(results) == 1
def test_get_po_returns_quads_for_predicate_object(self, entity_kg):
"""Test get_po queries by predicate and object"""
kg, mock_session = entity_kg
mock_result = [
MagicMock(s='http://example.org/Alice', d='', otype='u', dtype='', lang='',
o='http://example.org/Bob')
]
mock_session.execute.return_value = mock_result
results = kg.get_po('test_collection', 'http://example.org/knows',
'http://example.org/Bob')
mock_session.execute.assert_called()
assert len(results) == 1
def test_get_os_returns_quads_for_object_subject(self, entity_kg):
"""Test get_os queries by object and subject"""
kg, mock_session = entity_kg
mock_result = [
MagicMock(p='http://example.org/knows', d='', otype='u', dtype='', lang='',
s='http://example.org/Alice', o='http://example.org/Bob')
]
mock_session.execute.return_value = mock_result
results = kg.get_os('test_collection', 'http://example.org/Bob',
'http://example.org/Alice')
mock_session.execute.assert_called()
assert len(results) == 1
def test_get_spo_returns_quads_for_subject_predicate_object(self, entity_kg):
"""Test get_spo queries by subject, predicate, and object"""
kg, mock_session = entity_kg
mock_result = [
MagicMock(d='', otype='u', dtype='', lang='',
o='http://example.org/Bob')
]
mock_session.execute.return_value = mock_result
results = kg.get_spo('test_collection', 'http://example.org/Alice',
'http://example.org/knows', 'http://example.org/Bob')
mock_session.execute.assert_called()
assert len(results) == 1
def test_get_g_returns_quads_for_graph(self, entity_kg):
"""Test get_g queries by graph"""
kg, mock_session = entity_kg
mock_result = [
MagicMock(s='http://example.org/Alice', p='http://example.org/knows',
o='http://example.org/Bob', otype='u', dtype='', lang='')
]
mock_session.execute.return_value = mock_result
results = kg.get_g('test_collection', 'http://example.org/graph1')
mock_session.execute.assert_called()
def test_get_all_returns_all_quads_in_collection(self, entity_kg):
"""Test get_all returns all quads"""
kg, mock_session = entity_kg
mock_result = [
MagicMock(d='', s='http://example.org/Alice', p='http://example.org/knows',
o='http://example.org/Bob', otype='u', dtype='', lang='')
]
mock_session.execute.return_value = mock_result
results = kg.get_all('test_collection')
mock_session.execute.assert_called()
def test_graph_wildcard_returns_all_graphs(self, entity_kg):
"""Test that g='*' returns quads from all graphs"""
from trustgraph.direct.cassandra_kg import GRAPH_WILDCARD
kg, mock_session = entity_kg
mock_result = [
MagicMock(p='http://example.org/knows', d='http://example.org/graph1',
otype='u', dtype='', lang='', s='http://example.org/Alice',
o='http://example.org/Bob'),
MagicMock(p='http://example.org/knows', d='http://example.org/graph2',
otype='u', dtype='', lang='', s='http://example.org/Alice',
o='http://example.org/Charlie')
]
mock_session.execute.return_value = mock_result
results = kg.get_s('test_collection', 'http://example.org/Alice', g=GRAPH_WILDCARD)
# Should return quads from both graphs
assert len(results) == 2
def test_specific_graph_filters_results(self, entity_kg):
"""Test that specifying a graph filters results"""
kg, mock_session = entity_kg
mock_result = [
MagicMock(p='http://example.org/knows', d='http://example.org/graph1',
otype='u', dtype='', lang='', s='http://example.org/Alice',
o='http://example.org/Bob'),
MagicMock(p='http://example.org/knows', d='http://example.org/graph2',
otype='u', dtype='', lang='', s='http://example.org/Alice',
o='http://example.org/Charlie')
]
mock_session.execute.return_value = mock_result
results = kg.get_s('test_collection', 'http://example.org/Alice',
g='http://example.org/graph1')
# Should only return quads from graph1
assert len(results) == 1
assert results[0].g == 'http://example.org/graph1'
def test_collection_exists_returns_true_when_exists(self, entity_kg):
"""Test collection_exists returns True for existing collection"""
kg, mock_session = entity_kg
mock_result = [MagicMock(collection='test_collection')]
mock_session.execute.return_value = mock_result
exists = kg.collection_exists('test_collection')
assert exists is True
def test_collection_exists_returns_false_when_not_exists(self, entity_kg):
"""Test collection_exists returns False for non-existing collection"""
kg, mock_session = entity_kg
mock_session.execute.return_value = []
exists = kg.collection_exists('nonexistent_collection')
assert exists is False
def test_create_collection_inserts_metadata(self, entity_kg):
"""Test create_collection inserts metadata row"""
kg, mock_session = entity_kg
mock_session.reset_mock()
kg.create_collection('test_collection')
# Verify INSERT was executed for collection_metadata
mock_session.execute.assert_called()
def test_delete_collection_removes_all_data(self, entity_kg):
"""Test delete_collection removes entity partitions and collection rows"""
kg, mock_session = entity_kg
# Mock reading quads from collection
mock_quads = [
MagicMock(d='', s='http://example.org/Alice', p='http://example.org/knows',
o='http://example.org/Bob', otype='u')
]
mock_session.execute.return_value = mock_quads
mock_session.reset_mock()
kg.delete_collection('test_collection')
# Verify delete operations were executed
assert mock_session.execute.called
def test_close_shuts_down_connections(self, entity_kg):
"""Test close shuts down session and cluster"""
kg, mock_session = entity_kg
kg.close()
mock_session.shutdown.assert_called_once()
kg.cluster.shutdown.assert_called_once()
class TestQuadResult:
"""Test cases for QuadResult class"""
def test_quad_result_stores_all_fields(self):
"""Test QuadResult stores all quad fields"""
from trustgraph.direct.cassandra_kg import QuadResult
result = QuadResult(
s='http://example.org/Alice',
p='http://example.org/knows',
o='http://example.org/Bob',
g='http://example.org/graph1',
otype='u',
dtype='',
lang=''
)
assert result.s == 'http://example.org/Alice'
assert result.p == 'http://example.org/knows'
assert result.o == 'http://example.org/Bob'
assert result.g == 'http://example.org/graph1'
assert result.otype == 'u'
assert result.dtype == ''
assert result.lang == ''
def test_quad_result_defaults(self):
"""Test QuadResult default values"""
from trustgraph.direct.cassandra_kg import QuadResult
result = QuadResult(
s='http://example.org/s',
p='http://example.org/p',
o='literal value',
g=''
)
assert result.otype == 'u' # Default otype
assert result.dtype == ''
assert result.lang == ''
def test_quad_result_with_literal_metadata(self):
"""Test QuadResult with literal metadata"""
from trustgraph.direct.cassandra_kg import QuadResult
result = QuadResult(
s='http://example.org/Alice',
p='http://www.w3.org/2000/01/rdf-schema#label',
o='Alice Smith',
g='',
otype='l',
dtype='xsd:string',
lang='en'
)
assert result.otype == 'l'
assert result.dtype == 'xsd:string'
assert result.lang == 'en'
class TestWriteHelperFunctions:
"""Test cases for helper functions in write.py"""
def test_get_term_otype_for_iri(self):
"""Test get_term_otype returns 'u' for IRI terms"""
from trustgraph.storage.triples.cassandra.write import get_term_otype
from trustgraph.schema import Term, IRI
term = Term(type=IRI, iri='http://example.org/Alice')
assert get_term_otype(term) == 'u'
def test_get_term_otype_for_literal(self):
"""Test get_term_otype returns 'l' for LITERAL terms"""
from trustgraph.storage.triples.cassandra.write import get_term_otype
from trustgraph.schema import Term, LITERAL
term = Term(type=LITERAL, value='Alice Smith')
assert get_term_otype(term) == 'l'
def test_get_term_otype_for_blank(self):
"""Test get_term_otype returns 'u' for BLANK terms"""
from trustgraph.storage.triples.cassandra.write import get_term_otype
from trustgraph.schema import Term, BLANK
term = Term(type=BLANK, id='_:b1')
assert get_term_otype(term) == 'u'
def test_get_term_otype_for_triple(self):
"""Test get_term_otype returns 't' for TRIPLE terms"""
from trustgraph.storage.triples.cassandra.write import get_term_otype
from trustgraph.schema import Term, TRIPLE
term = Term(type=TRIPLE)
assert get_term_otype(term) == 't'
def test_get_term_otype_for_none(self):
"""Test get_term_otype returns 'u' for None"""
from trustgraph.storage.triples.cassandra.write import get_term_otype
assert get_term_otype(None) == 'u'
def test_get_term_dtype_for_literal(self):
"""Test get_term_dtype extracts datatype from LITERAL"""
from trustgraph.storage.triples.cassandra.write import get_term_dtype
from trustgraph.schema import Term, LITERAL
term = Term(type=LITERAL, value='42', datatype='xsd:integer')
assert get_term_dtype(term) == 'xsd:integer'
def test_get_term_dtype_for_non_literal(self):
"""Test get_term_dtype returns empty string for non-LITERAL"""
from trustgraph.storage.triples.cassandra.write import get_term_dtype
from trustgraph.schema import Term, IRI
term = Term(type=IRI, iri='http://example.org/Alice')
assert get_term_dtype(term) == ''
def test_get_term_dtype_for_none(self):
"""Test get_term_dtype returns empty string for None"""
from trustgraph.storage.triples.cassandra.write import get_term_dtype
assert get_term_dtype(None) == ''
def test_get_term_lang_for_literal(self):
"""Test get_term_lang extracts language from LITERAL"""
from trustgraph.storage.triples.cassandra.write import get_term_lang
from trustgraph.schema import Term, LITERAL
term = Term(type=LITERAL, value='Alice Smith', language='en')
assert get_term_lang(term) == 'en'
def test_get_term_lang_for_non_literal(self):
"""Test get_term_lang returns empty string for non-LITERAL"""
from trustgraph.storage.triples.cassandra.write import get_term_lang
from trustgraph.schema import Term, IRI
term = Term(type=IRI, iri='http://example.org/Alice')
assert get_term_lang(term) == ''
class TestServiceHelperFunctions:
"""Test cases for helper functions in service.py"""
def test_create_term_with_uri_otype(self):
"""Test create_term creates IRI Term for otype='u'"""
from trustgraph.query.triples.cassandra.service import create_term
from trustgraph.schema import IRI
term = create_term('http://example.org/Alice', otype='u')
assert term.type == IRI
assert term.iri == 'http://example.org/Alice'
def test_create_term_with_literal_otype(self):
"""Test create_term creates LITERAL Term for otype='l'"""
from trustgraph.query.triples.cassandra.service import create_term
from trustgraph.schema import LITERAL
term = create_term('Alice Smith', otype='l', dtype='xsd:string', lang='en')
assert term.type == LITERAL
assert term.value == 'Alice Smith'
assert term.datatype == 'xsd:string'
assert term.language == 'en'
def test_create_term_with_triple_otype(self):
"""Test create_term creates IRI Term for otype='t'"""
from trustgraph.query.triples.cassandra.service import create_term
from trustgraph.schema import IRI
term = create_term('http://example.org/statement1', otype='t')
assert term.type == IRI
assert term.iri == 'http://example.org/statement1'
def test_create_term_heuristic_fallback_uri(self):
"""Test create_term uses URL heuristic when otype not provided"""
from trustgraph.query.triples.cassandra.service import create_term
from trustgraph.schema import IRI
term = create_term('http://example.org/Alice')
assert term.type == IRI
assert term.iri == 'http://example.org/Alice'
def test_create_term_heuristic_fallback_literal(self):
"""Test create_term uses literal heuristic when otype not provided"""
from trustgraph.query.triples.cassandra.service import create_term
from trustgraph.schema import LITERAL
term = create_term('Alice Smith')
assert term.type == LITERAL
assert term.value == 'Alice Smith'

View file

@ -70,17 +70,21 @@ class TestCassandraQueryProcessor:
assert result.type == LITERAL
@pytest.mark.asyncio
@patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph')
async def test_query_triples_spo_query(self, mock_trustgraph):
@patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph')
async def test_query_triples_spo_query(self, mock_kg_class):
"""Test querying triples with subject, predicate, and object specified"""
from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL
# Setup mock TrustGraph
# Setup mock TrustGraph via factory function
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
# SPO query returns a list of results (with mock graph attribute)
mock_result = MagicMock()
mock_result.g = None
mock_result.g = ''
mock_result.otype = None
mock_result.dtype = None
mock_result.lang = None
mock_result.o = 'test_object'
mock_tg_instance.get_spo.return_value = [mock_result]
processor = Processor(
@ -102,7 +106,7 @@ class TestCassandraQueryProcessor:
result = await processor.query_triples(query)
# Verify KnowledgeGraph was created with correct parameters
mock_trustgraph.assert_called_once_with(
mock_kg_class.assert_called_once_with(
hosts=['localhost'],
keyspace='test_user'
)
@ -146,17 +150,21 @@ class TestCassandraQueryProcessor:
assert processor.table is None
@pytest.mark.asyncio
@patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph')
async def test_query_triples_sp_pattern(self, mock_trustgraph):
@patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph')
async def test_query_triples_sp_pattern(self, mock_kg_class):
"""Test SP query pattern (subject and predicate, no object)"""
from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL
# Setup mock TrustGraph and response
# Setup mock TrustGraph via factory function
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
mock_result = MagicMock()
mock_result.o = 'result_object'
mock_result.g = ''
mock_result.otype = None
mock_result.dtype = None
mock_result.lang = None
mock_tg_instance.get_sp.return_value = [mock_result]
processor = Processor(taskgroup=MagicMock())
@ -179,17 +187,21 @@ class TestCassandraQueryProcessor:
assert result[0].o.value == 'result_object'
@pytest.mark.asyncio
@patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph')
async def test_query_triples_s_pattern(self, mock_trustgraph):
@patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph')
async def test_query_triples_s_pattern(self, mock_kg_class):
"""Test S query pattern (subject only)"""
from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
mock_result = MagicMock()
mock_result.p = 'result_predicate'
mock_result.o = 'result_object'
mock_result.g = ''
mock_result.otype = None
mock_result.dtype = None
mock_result.lang = None
mock_tg_instance.get_s.return_value = [mock_result]
processor = Processor(taskgroup=MagicMock())
@ -212,17 +224,21 @@ class TestCassandraQueryProcessor:
assert result[0].o.value == 'result_object'
@pytest.mark.asyncio
@patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph')
async def test_query_triples_p_pattern(self, mock_trustgraph):
@patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph')
async def test_query_triples_p_pattern(self, mock_kg_class):
"""Test P query pattern (predicate only)"""
from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
mock_result = MagicMock()
mock_result.s = 'result_subject'
mock_result.o = 'result_object'
mock_result.g = ''
mock_result.otype = None
mock_result.dtype = None
mock_result.lang = None
mock_tg_instance.get_p.return_value = [mock_result]
processor = Processor(taskgroup=MagicMock())
@ -245,17 +261,21 @@ class TestCassandraQueryProcessor:
assert result[0].o.value == 'result_object'
@pytest.mark.asyncio
@patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph')
async def test_query_triples_o_pattern(self, mock_trustgraph):
@patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph')
async def test_query_triples_o_pattern(self, mock_kg_class):
"""Test O query pattern (object only)"""
from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
mock_result = MagicMock()
mock_result.s = 'result_subject'
mock_result.p = 'result_predicate'
mock_result.g = ''
mock_result.otype = None
mock_result.dtype = None
mock_result.lang = None
mock_tg_instance.get_o.return_value = [mock_result]
processor = Processor(taskgroup=MagicMock())
@ -278,18 +298,22 @@ class TestCassandraQueryProcessor:
assert result[0].o.value == 'test_object'
@pytest.mark.asyncio
@patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph')
async def test_query_triples_get_all_pattern(self, mock_trustgraph):
@patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph')
async def test_query_triples_get_all_pattern(self, mock_kg_class):
"""Test query pattern with no constraints (get all)"""
from trustgraph.schema import TriplesQueryRequest
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
mock_result = MagicMock()
mock_result.s = 'all_subject'
mock_result.p = 'all_predicate'
mock_result.o = 'all_object'
mock_result.g = ''
mock_result.otype = None
mock_result.dtype = None
mock_result.lang = None
mock_tg_instance.get_all.return_value = [mock_result]
processor = Processor(taskgroup=MagicMock())
@ -378,16 +402,20 @@ class TestCassandraQueryProcessor:
mock_launch.assert_called_once_with(default_ident, '\nTriples query service. Input is a (s, p, o, g) quad pattern, some values may be\nnull. Output is a list of quads.\n')
@pytest.mark.asyncio
@patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph')
async def test_query_triples_with_authentication(self, mock_trustgraph):
@patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph')
async def test_query_triples_with_authentication(self, mock_kg_class):
"""Test querying with username and password authentication"""
from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
# SPO query returns a list of results
mock_result = MagicMock()
mock_result.g = None
mock_result.g = ''
mock_result.otype = None
mock_result.dtype = None
mock_result.lang = None
mock_result.o = 'test_object'
mock_tg_instance.get_spo.return_value = [mock_result]
processor = Processor(
@ -408,7 +436,7 @@ class TestCassandraQueryProcessor:
await processor.query_triples(query)
# Verify KnowledgeGraph was created with authentication
mock_trustgraph.assert_called_once_with(
mock_kg_class.assert_called_once_with(
hosts=['cassandra'], # Updated default
keyspace='test_user',
username='authuser',
@ -416,16 +444,20 @@ class TestCassandraQueryProcessor:
)
@pytest.mark.asyncio
@patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph')
async def test_query_triples_table_reuse(self, mock_trustgraph):
@patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph')
async def test_query_triples_table_reuse(self, mock_kg_class):
"""Test that TrustGraph is reused for same table"""
from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
# SPO query returns a list of results
mock_result = MagicMock()
mock_result.g = None
mock_result.g = ''
mock_result.otype = None
mock_result.dtype = None
mock_result.lang = None
mock_result.o = 'test_object'
mock_tg_instance.get_spo.return_value = [mock_result]
processor = Processor(taskgroup=MagicMock())
@ -441,21 +473,32 @@ class TestCassandraQueryProcessor:
# First query should create TrustGraph
await processor.query_triples(query)
assert mock_trustgraph.call_count == 1
assert mock_kg_class.call_count == 1
# Second query with same table should reuse TrustGraph
await processor.query_triples(query)
assert mock_trustgraph.call_count == 1 # Should not increase
assert mock_kg_class.call_count == 1 # Should not increase
@pytest.mark.asyncio
@patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph')
async def test_query_triples_table_switching(self, mock_trustgraph):
@patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph')
async def test_query_triples_table_switching(self, mock_kg_class):
"""Test table switching creates new TrustGraph"""
from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL
mock_tg_instance1 = MagicMock()
mock_tg_instance2 = MagicMock()
mock_trustgraph.side_effect = [mock_tg_instance1, mock_tg_instance2]
mock_kg_class.side_effect = [mock_tg_instance1, mock_tg_instance2]
# Setup mock results for both instances
mock_result = MagicMock()
mock_result.g = ''
mock_result.otype = None
mock_result.dtype = None
mock_result.lang = None
mock_result.p = 'p'
mock_result.o = 'o'
mock_tg_instance1.get_s.return_value = [mock_result]
mock_tg_instance2.get_s.return_value = [mock_result]
processor = Processor(taskgroup=MagicMock())
@ -486,16 +529,16 @@ class TestCassandraQueryProcessor:
assert processor.table == 'user2'
# Verify TrustGraph was created twice
assert mock_trustgraph.call_count == 2
assert mock_kg_class.call_count == 2
@pytest.mark.asyncio
@patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph')
async def test_query_triples_exception_handling(self, mock_trustgraph):
@patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph')
async def test_query_triples_exception_handling(self, mock_kg_class):
"""Test exception handling during query execution"""
from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
mock_tg_instance.get_spo.side_effect = Exception("Query failed")
processor = Processor(taskgroup=MagicMock())
@ -513,19 +556,27 @@ class TestCassandraQueryProcessor:
await processor.query_triples(query)
@pytest.mark.asyncio
@patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph')
async def test_query_triples_multiple_results(self, mock_trustgraph):
@patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph')
async def test_query_triples_multiple_results(self, mock_kg_class):
"""Test query returning multiple results"""
from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
# Mock multiple results
mock_result1 = MagicMock()
mock_result1.o = 'object1'
mock_result1.g = ''
mock_result1.otype = None
mock_result1.dtype = None
mock_result1.lang = None
mock_result2 = MagicMock()
mock_result2.o = 'object2'
mock_result2.g = ''
mock_result2.otype = None
mock_result2.dtype = None
mock_result2.lang = None
mock_tg_instance.get_sp.return_value = [mock_result1, mock_result2]
processor = Processor(taskgroup=MagicMock())
@ -550,16 +601,20 @@ class TestCassandraQueryPerformanceOptimizations:
"""Test cases for multi-table performance optimizations in query service"""
@pytest.mark.asyncio
@patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph')
async def test_get_po_query_optimization(self, mock_trustgraph):
@patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph')
async def test_get_po_query_optimization(self, mock_kg_class):
"""Test that get_po queries use optimized table (no ALLOW FILTERING)"""
from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
mock_result = MagicMock()
mock_result.s = 'result_subject'
mock_result.g = ''
mock_result.otype = None
mock_result.dtype = None
mock_result.lang = None
mock_tg_instance.get_po.return_value = [mock_result]
processor = Processor(taskgroup=MagicMock())
@ -587,16 +642,20 @@ class TestCassandraQueryPerformanceOptimizations:
assert result[0].o.value == 'test_object'
@pytest.mark.asyncio
@patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph')
async def test_get_os_query_optimization(self, mock_trustgraph):
@patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph')
async def test_get_os_query_optimization(self, mock_kg_class):
"""Test that get_os queries use optimized table (no ALLOW FILTERING)"""
from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
mock_result = MagicMock()
mock_result.p = 'result_predicate'
mock_result.g = ''
mock_result.otype = None
mock_result.dtype = None
mock_result.lang = None
mock_tg_instance.get_os.return_value = [mock_result]
processor = Processor(taskgroup=MagicMock())
@ -624,13 +683,13 @@ class TestCassandraQueryPerformanceOptimizations:
assert result[0].o.value == 'test_object'
@pytest.mark.asyncio
@patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph')
async def test_all_query_patterns_use_correct_tables(self, mock_trustgraph):
@patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph')
async def test_all_query_patterns_use_correct_tables(self, mock_kg_class):
"""Test that all query patterns route to their optimal tables"""
from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
# Mock empty results for all queries
mock_tg_instance.get_all.return_value = []
@ -696,19 +755,23 @@ class TestCassandraQueryPerformanceOptimizations:
# Mode is determined in KnowledgeGraph initialization
@pytest.mark.asyncio
@patch('trustgraph.query.triples.cassandra.service.KnowledgeGraph')
async def test_performance_critical_po_query_no_filtering(self, mock_trustgraph):
@patch('trustgraph.query.triples.cassandra.service.EntityCentricKnowledgeGraph')
async def test_performance_critical_po_query_no_filtering(self, mock_kg_class):
"""Test the performance-critical PO query that eliminates ALLOW FILTERING"""
from trustgraph.schema import TriplesQueryRequest, Term, IRI, LITERAL
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
# Mock multiple subjects for the same predicate-object pair
mock_results = []
for i in range(5):
mock_result = MagicMock()
mock_result.s = f'subject_{i}'
mock_result.g = ''
mock_result.otype = None
mock_result.dtype = None
mock_result.lang = None
mock_results.append(mock_result)
mock_tg_instance.get_po.return_value = mock_results

View file

@ -6,7 +6,7 @@ import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from trustgraph.storage.triples.cassandra.write import Processor
from trustgraph.schema import Triple, LITERAL
from trustgraph.schema import Triple, LITERAL, IRI
from trustgraph.direct.cassandra_kg import DEFAULT_GRAPH
@ -87,12 +87,12 @@ class TestCassandraStorageProcessor:
assert processor.cassandra_username == 'new-user' # Only cassandra_* params work
@pytest.mark.asyncio
@patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph')
async def test_table_switching_with_auth(self, mock_trustgraph):
@patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph')
async def test_table_switching_with_auth(self, mock_kg_class):
"""Test table switching logic when authentication is provided"""
taskgroup_mock = MagicMock()
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
processor = Processor(
taskgroup=taskgroup_mock,
@ -109,7 +109,7 @@ class TestCassandraStorageProcessor:
await processor.store_triples(mock_message)
# Verify KnowledgeGraph was called with auth parameters
mock_trustgraph.assert_called_once_with(
mock_kg_class.assert_called_once_with(
hosts=['cassandra'], # Updated default
keyspace='user1',
username='testuser',
@ -118,12 +118,12 @@ class TestCassandraStorageProcessor:
assert processor.table == 'user1'
@pytest.mark.asyncio
@patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph')
async def test_table_switching_without_auth(self, mock_trustgraph):
@patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph')
async def test_table_switching_without_auth(self, mock_kg_class):
"""Test table switching logic when no authentication is provided"""
taskgroup_mock = MagicMock()
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
processor = Processor(taskgroup=taskgroup_mock)
@ -136,19 +136,19 @@ class TestCassandraStorageProcessor:
await processor.store_triples(mock_message)
# Verify KnowledgeGraph was called without auth parameters
mock_trustgraph.assert_called_once_with(
mock_kg_class.assert_called_once_with(
hosts=['cassandra'], # Updated default
keyspace='user2'
)
assert processor.table == 'user2'
@pytest.mark.asyncio
@patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph')
async def test_table_reuse_when_same(self, mock_trustgraph):
@patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph')
async def test_table_reuse_when_same(self, mock_kg_class):
"""Test that TrustGraph is not recreated when table hasn't changed"""
taskgroup_mock = MagicMock()
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
processor = Processor(taskgroup=taskgroup_mock)
@ -160,19 +160,19 @@ class TestCassandraStorageProcessor:
# First call should create TrustGraph
await processor.store_triples(mock_message)
assert mock_trustgraph.call_count == 1
assert mock_kg_class.call_count == 1
# Second call with same table should reuse TrustGraph
await processor.store_triples(mock_message)
assert mock_trustgraph.call_count == 1 # Should not increase
assert mock_kg_class.call_count == 1 # Should not increase
@pytest.mark.asyncio
@patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph')
async def test_triple_insertion(self, mock_trustgraph):
@patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph')
async def test_triple_insertion(self, mock_kg_class):
"""Test that triples are properly inserted into Cassandra"""
taskgroup_mock = MagicMock()
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
processor = Processor(taskgroup=taskgroup_mock)
@ -180,19 +180,27 @@ class TestCassandraStorageProcessor:
triple1 = MagicMock()
triple1.s.type = LITERAL
triple1.s.value = 'subject1'
triple1.s.datatype = ''
triple1.s.language = ''
triple1.p.type = LITERAL
triple1.p.value = 'predicate1'
triple1.o.type = LITERAL
triple1.o.value = 'object1'
triple1.o.datatype = ''
triple1.o.language = ''
triple1.g = None
triple2 = MagicMock()
triple2.s.type = LITERAL
triple2.s.value = 'subject2'
triple2.s.datatype = ''
triple2.s.language = ''
triple2.p.type = LITERAL
triple2.p.value = 'predicate2'
triple2.o.type = LITERAL
triple2.o.value = 'object2'
triple2.o.datatype = ''
triple2.o.language = ''
triple2.g = None
# Create mock message
@ -203,18 +211,24 @@ class TestCassandraStorageProcessor:
await processor.store_triples(mock_message)
# Verify both triples were inserted (with g= parameter)
# Verify both triples were inserted (with g=, otype=, dtype=, lang= parameters)
assert mock_tg_instance.insert.call_count == 2
mock_tg_instance.insert.assert_any_call('collection1', 'subject1', 'predicate1', 'object1', g=DEFAULT_GRAPH)
mock_tg_instance.insert.assert_any_call('collection1', 'subject2', 'predicate2', 'object2', g=DEFAULT_GRAPH)
mock_tg_instance.insert.assert_any_call(
'collection1', 'subject1', 'predicate1', 'object1',
g=DEFAULT_GRAPH, otype='l', dtype='', lang=''
)
mock_tg_instance.insert.assert_any_call(
'collection1', 'subject2', 'predicate2', 'object2',
g=DEFAULT_GRAPH, otype='l', dtype='', lang=''
)
@pytest.mark.asyncio
@patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph')
async def test_triple_insertion_with_empty_list(self, mock_trustgraph):
@patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph')
async def test_triple_insertion_with_empty_list(self, mock_kg_class):
"""Test behavior when message has no triples"""
taskgroup_mock = MagicMock()
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
processor = Processor(taskgroup=taskgroup_mock)
@ -230,12 +244,12 @@ class TestCassandraStorageProcessor:
mock_tg_instance.insert.assert_not_called()
@pytest.mark.asyncio
@patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph')
@patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph')
@patch('trustgraph.storage.triples.cassandra.write.time.sleep')
async def test_exception_handling_with_retry(self, mock_sleep, mock_trustgraph):
async def test_exception_handling_with_retry(self, mock_sleep, mock_kg_class):
"""Test exception handling during TrustGraph creation"""
taskgroup_mock = MagicMock()
mock_trustgraph.side_effect = Exception("Connection failed")
mock_kg_class.side_effect = Exception("Connection failed")
processor = Processor(taskgroup=taskgroup_mock)
@ -335,13 +349,13 @@ class TestCassandraStorageProcessor:
mock_launch.assert_called_once_with(default_ident, '\nGraph writer. Input is graph edge. Writes edges to Cassandra graph.\n')
@pytest.mark.asyncio
@patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph')
async def test_store_triples_table_switching_between_different_tables(self, mock_trustgraph):
@patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph')
async def test_store_triples_table_switching_between_different_tables(self, mock_kg_class):
"""Test table switching when different tables are used in sequence"""
taskgroup_mock = MagicMock()
mock_tg_instance1 = MagicMock()
mock_tg_instance2 = MagicMock()
mock_trustgraph.side_effect = [mock_tg_instance1, mock_tg_instance2]
mock_kg_class.side_effect = [mock_tg_instance1, mock_tg_instance2]
processor = Processor(taskgroup=taskgroup_mock)
@ -366,15 +380,15 @@ class TestCassandraStorageProcessor:
assert processor.tg == mock_tg_instance2
# Verify TrustGraph was created twice for different tables
assert mock_trustgraph.call_count == 2
assert mock_kg_class.call_count == 2
@pytest.mark.asyncio
@patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph')
async def test_store_triples_with_special_characters_in_values(self, mock_trustgraph):
@patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph')
async def test_store_triples_with_special_characters_in_values(self, mock_kg_class):
"""Test storing triples with special characters and unicode"""
taskgroup_mock = MagicMock()
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
processor = Processor(taskgroup=taskgroup_mock)
@ -382,10 +396,14 @@ class TestCassandraStorageProcessor:
triple = MagicMock()
triple.s.type = LITERAL
triple.s.value = 'subject with spaces & symbols'
triple.s.datatype = ''
triple.s.language = ''
triple.p.type = LITERAL
triple.p.value = 'predicate:with/colons'
triple.o.type = LITERAL
triple.o.value = 'object with "quotes" and unicode: ñáéíóú'
triple.o.datatype = ''
triple.o.language = ''
triple.g = None
mock_message = MagicMock()
@ -401,12 +419,15 @@ class TestCassandraStorageProcessor:
'subject with spaces & symbols',
'predicate:with/colons',
'object with "quotes" and unicode: ñáéíóú',
g=DEFAULT_GRAPH
g=DEFAULT_GRAPH,
otype='l',
dtype='',
lang=''
)
@pytest.mark.asyncio
@patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph')
async def test_store_triples_preserves_old_table_on_exception(self, mock_trustgraph):
@patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph')
async def test_store_triples_preserves_old_table_on_exception(self, mock_kg_class):
"""Test that table remains unchanged when TrustGraph creation fails"""
taskgroup_mock = MagicMock()
@ -416,7 +437,7 @@ class TestCassandraStorageProcessor:
processor.table = ('old_user', 'old_collection')
# Mock TrustGraph to raise exception
mock_trustgraph.side_effect = Exception("Connection failed")
mock_kg_class.side_effect = Exception("Connection failed")
mock_message = MagicMock()
mock_message.metadata.user = 'new_user'
@ -436,12 +457,12 @@ class TestCassandraPerformanceOptimizations:
"""Test cases for multi-table performance optimizations"""
@pytest.mark.asyncio
@patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph')
async def test_legacy_mode_uses_single_table(self, mock_trustgraph):
@patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph')
async def test_legacy_mode_uses_single_table(self, mock_kg_class):
"""Test that legacy mode still works with single table"""
taskgroup_mock = MagicMock()
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
with patch.dict('os.environ', {'CASSANDRA_USE_LEGACY': 'true'}):
processor = Processor(taskgroup=taskgroup_mock)
@ -454,16 +475,15 @@ class TestCassandraPerformanceOptimizations:
await processor.store_triples(mock_message)
# Verify KnowledgeGraph instance uses legacy mode
kg_instance = mock_trustgraph.return_value
assert kg_instance is not None
assert mock_tg_instance is not None
@pytest.mark.asyncio
@patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph')
async def test_optimized_mode_uses_multi_table(self, mock_trustgraph):
@patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph')
async def test_optimized_mode_uses_multi_table(self, mock_kg_class):
"""Test that optimized mode uses multi-table schema"""
taskgroup_mock = MagicMock()
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
with patch.dict('os.environ', {'CASSANDRA_USE_LEGACY': 'false'}):
processor = Processor(taskgroup=taskgroup_mock)
@ -476,16 +496,15 @@ class TestCassandraPerformanceOptimizations:
await processor.store_triples(mock_message)
# Verify KnowledgeGraph instance is in optimized mode
kg_instance = mock_trustgraph.return_value
assert kg_instance is not None
assert mock_tg_instance is not None
@pytest.mark.asyncio
@patch('trustgraph.storage.triples.cassandra.write.KnowledgeGraph')
async def test_batch_write_consistency(self, mock_trustgraph):
@patch('trustgraph.storage.triples.cassandra.write.EntityCentricKnowledgeGraph')
async def test_batch_write_consistency(self, mock_kg_class):
"""Test that all tables stay consistent during batch writes"""
taskgroup_mock = MagicMock()
mock_tg_instance = MagicMock()
mock_trustgraph.return_value = mock_tg_instance
mock_kg_class.return_value = mock_tg_instance
processor = Processor(taskgroup=taskgroup_mock)
@ -493,10 +512,14 @@ class TestCassandraPerformanceOptimizations:
triple = MagicMock()
triple.s.type = LITERAL
triple.s.value = 'test_subject'
triple.s.datatype = ''
triple.s.language = ''
triple.p.type = LITERAL
triple.p.value = 'test_predicate'
triple.o.type = LITERAL
triple.o.value = 'test_object'
triple.o.datatype = ''
triple.o.language = ''
triple.g = None
mock_message = MagicMock()
@ -509,7 +532,7 @@ class TestCassandraPerformanceOptimizations:
# Verify insert was called for the triple (implementation details tested in KnowledgeGraph)
mock_tg_instance.insert.assert_called_once_with(
'collection1', 'test_subject', 'test_predicate', 'test_object',
g=DEFAULT_GRAPH
g=DEFAULT_GRAPH, otype='l', dtype='', lang=''
)
def test_environment_variable_controls_mode(self):

View file

@ -20,6 +20,11 @@ DEFAULT_GRAPH = ""
class KnowledgeGraph:
"""
REDUNDANT: This 7-table implementation has been superseded by
EntityCentricKnowledgeGraph which uses a more efficient 2-table model.
This class is retained temporarily for reference but should not be used
for new deployments.
Cassandra-backed knowledge graph supporting quads (s, p, o, g).
Uses 7 tables to support all 16 query patterns efficiently:
@ -516,3 +521,575 @@ class KnowledgeGraph:
self.cluster.shutdown()
if self.cluster in _active_clusters:
_active_clusters.remove(self.cluster)
class EntityCentricKnowledgeGraph:
"""
Entity-centric Cassandra-backed knowledge graph supporting quads (s, p, o, g).
Uses 2 tables instead of 7:
- quads_by_entity: every entity knows every quad it participates in
- quads_by_collection: manifest for collection-level queries and deletion
Supports all 16 query patterns with single-partition reads.
"""
def __init__(
self, hosts=None,
keyspace="trustgraph", username=None, password=None
):
if hosts is None:
hosts = ["localhost"]
self.keyspace = keyspace
self.username = username
# 2-table entity-centric schema
self.entity_table = "quads_by_entity"
self.collection_table = "quads_by_collection"
# Collection metadata tracking
self.collection_metadata_table = "collection_metadata"
if username and password:
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
auth_provider = PlainTextAuthProvider(username=username, password=password)
self.cluster = Cluster(hosts, auth_provider=auth_provider, ssl_context=ssl_context)
else:
self.cluster = Cluster(hosts)
self.session = self.cluster.connect()
# Track this cluster globally
_active_clusters.append(self.cluster)
self.init()
self.prepare_statements()
def clear(self):
self.session.execute(f"""
drop keyspace if exists {self.keyspace};
""")
self.init()
def init(self):
self.session.execute(f"""
create keyspace if not exists {self.keyspace}
with replication = {{
'class' : 'SimpleStrategy',
'replication_factor' : 1
}};
""")
self.session.set_keyspace(self.keyspace)
self.init_entity_centric_schema()
def init_entity_centric_schema(self):
"""Initialize 2-table entity-centric schema"""
# quads_by_entity: primary data table
# Every entity has a partition containing all quads it participates in
self.session.execute(f"""
CREATE TABLE IF NOT EXISTS {self.entity_table} (
collection text,
entity text,
role text,
p text,
otype text,
s text,
o text,
d text,
dtype text,
lang text,
PRIMARY KEY ((collection, entity), role, p, otype, s, o, d)
);
""")
# quads_by_collection: manifest for collection-level queries and deletion
self.session.execute(f"""
CREATE TABLE IF NOT EXISTS {self.collection_table} (
collection text,
d text,
s text,
p text,
o text,
otype text,
dtype text,
lang text,
PRIMARY KEY (collection, d, s, p, o)
);
""")
# Collection metadata tracking
self.session.execute(f"""
CREATE TABLE IF NOT EXISTS {self.collection_metadata_table} (
collection text,
created_at timestamp,
PRIMARY KEY (collection)
);
""")
logger.info("Entity-centric schema initialized (2 tables + metadata)")
def prepare_statements(self):
"""Prepare statements for entity-centric schema"""
# Insert statement for quads_by_entity
self.insert_entity_stmt = self.session.prepare(
f"INSERT INTO {self.entity_table} "
"(collection, entity, role, p, otype, s, o, d, dtype, lang) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
)
# Insert statement for quads_by_collection
self.insert_collection_stmt = self.session.prepare(
f"INSERT INTO {self.collection_table} "
"(collection, d, s, p, o, otype, dtype, lang) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
)
# Query statements for quads_by_entity
# Get all quads for an entity (any role)
self.get_entity_all_stmt = self.session.prepare(
f"SELECT role, p, otype, s, o, d, dtype, lang FROM {self.entity_table} "
"WHERE collection = ? AND entity = ? LIMIT ?"
)
# Get quads where entity is subject (role='S')
self.get_entity_as_s_stmt = self.session.prepare(
f"SELECT p, otype, s, o, d, dtype, lang FROM {self.entity_table} "
"WHERE collection = ? AND entity = ? AND role = 'S' LIMIT ?"
)
# Get quads where entity is subject with specific predicate
self.get_entity_as_s_p_stmt = self.session.prepare(
f"SELECT otype, s, o, d, dtype, lang FROM {self.entity_table} "
"WHERE collection = ? AND entity = ? AND role = 'S' AND p = ? LIMIT ?"
)
# Get quads where entity is subject with specific predicate and otype
self.get_entity_as_s_p_otype_stmt = self.session.prepare(
f"SELECT s, o, d, dtype, lang FROM {self.entity_table} "
"WHERE collection = ? AND entity = ? AND role = 'S' AND p = ? AND otype = ? LIMIT ?"
)
# Get quads where entity is predicate (role='P')
self.get_entity_as_p_stmt = self.session.prepare(
f"SELECT p, otype, s, o, d, dtype, lang FROM {self.entity_table} "
"WHERE collection = ? AND entity = ? AND role = 'P' LIMIT ?"
)
# Get quads where entity is object (role='O')
self.get_entity_as_o_stmt = self.session.prepare(
f"SELECT p, otype, s, o, d, dtype, lang FROM {self.entity_table} "
"WHERE collection = ? AND entity = ? AND role = 'O' LIMIT ?"
)
# Get quads where entity is object with specific predicate
self.get_entity_as_o_p_stmt = self.session.prepare(
f"SELECT otype, s, o, d, dtype, lang FROM {self.entity_table} "
"WHERE collection = ? AND entity = ? AND role = 'O' AND p = ? LIMIT ?"
)
# Get quads where entity is graph (role='G')
self.get_entity_as_g_stmt = self.session.prepare(
f"SELECT p, otype, s, o, d, dtype, lang FROM {self.entity_table} "
"WHERE collection = ? AND entity = ? AND role = 'G' LIMIT ?"
)
# Query statements for quads_by_collection
# Get all quads in collection
self.get_collection_all_stmt = self.session.prepare(
f"SELECT d, s, p, o, otype, dtype, lang FROM {self.collection_table} "
"WHERE collection = ? LIMIT ?"
)
# Get all quads in a specific graph
self.get_collection_by_graph_stmt = self.session.prepare(
f"SELECT s, p, o, otype, dtype, lang FROM {self.collection_table} "
"WHERE collection = ? AND d = ? LIMIT ?"
)
# Delete statements
self.delete_entity_partition_stmt = self.session.prepare(
f"DELETE FROM {self.entity_table} WHERE collection = ? AND entity = ?"
)
self.delete_collection_row_stmt = self.session.prepare(
f"DELETE FROM {self.collection_table} WHERE collection = ? AND d = ? AND s = ? AND p = ? AND o = ?"
)
logger.info("Prepared statements initialized for entity-centric schema")
def insert(self, collection, s, p, o, g=None, otype=None, dtype="", lang=""):
"""
Insert a quad into entity-centric tables.
Writes 4 rows to quads_by_entity (one for each entity role) + 1 row to
quads_by_collection. For literals, only 3 entity rows are written since
literals are not independently queryable entities.
Args:
collection: Collection/tenant scope
s: Subject (string value)
p: Predicate (string value)
o: Object (string value)
g: Graph/dataset (None for default graph)
otype: Object type - 'u' (URI), 'l' (literal), 't' (triple)
Auto-detected from o value if not provided
dtype: XSD datatype (for literals)
lang: Language tag (for literals)
"""
# Default graph stored as empty string
if g is None:
g = DEFAULT_GRAPH
# Auto-detect otype if not provided (backwards compatibility)
if otype is None:
if o.startswith("http://") or o.startswith("https://"):
otype = "u"
else:
otype = "l"
batch = BatchStatement()
# Write row for subject entity (role='S')
batch.add(self.insert_entity_stmt, (
collection, s, 'S', p, otype, s, o, g, dtype, lang
))
# Write row for predicate entity (role='P')
batch.add(self.insert_entity_stmt, (
collection, p, 'P', p, otype, s, o, g, dtype, lang
))
# Write row for object entity (role='O') - only for URIs, not literals
if otype == 'u' or otype == 't':
batch.add(self.insert_entity_stmt, (
collection, o, 'O', p, otype, s, o, g, dtype, lang
))
# Write row for graph entity (role='G') - only for non-default graphs
if g != DEFAULT_GRAPH:
batch.add(self.insert_entity_stmt, (
collection, g, 'G', p, otype, s, o, g, dtype, lang
))
# Write row to quads_by_collection
batch.add(self.insert_collection_stmt, (
collection, g, s, p, o, otype, dtype, lang
))
self.session.execute(batch)
# ========================================================================
# Query methods
# g=None means default graph, g="*" means all graphs
# Results include otype, dtype, lang for proper Term reconstruction
# ========================================================================
def get_all(self, collection, limit=50):
"""Get all quads in collection"""
return self.session.execute(self.get_collection_all_stmt, (collection, limit))
def get_s(self, collection, s, g=None, limit=10):
"""
Query by subject. Returns quads where s is the subject.
g=None: default graph, g='*': all graphs
"""
rows = self.session.execute(self.get_entity_as_s_stmt, (collection, s, limit))
results = []
for row in rows:
d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH
# Filter by graph if specified
if g is None or g == DEFAULT_GRAPH:
if d != DEFAULT_GRAPH:
continue
elif g != GRAPH_WILDCARD and d != g:
continue
results.append(QuadResult(
s=row.s, p=row.p, o=row.o, g=d,
otype=row.otype, dtype=row.dtype, lang=row.lang
))
return results
def get_p(self, collection, p, g=None, limit=10):
"""Query by predicate"""
rows = self.session.execute(self.get_entity_as_p_stmt, (collection, p, limit))
results = []
for row in rows:
d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH
if g is None or g == DEFAULT_GRAPH:
if d != DEFAULT_GRAPH:
continue
elif g != GRAPH_WILDCARD and d != g:
continue
results.append(QuadResult(
s=row.s, p=row.p, o=row.o, g=d,
otype=row.otype, dtype=row.dtype, lang=row.lang
))
return results
def get_o(self, collection, o, g=None, limit=10):
"""Query by object"""
rows = self.session.execute(self.get_entity_as_o_stmt, (collection, o, limit))
results = []
for row in rows:
d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH
if g is None or g == DEFAULT_GRAPH:
if d != DEFAULT_GRAPH:
continue
elif g != GRAPH_WILDCARD and d != g:
continue
results.append(QuadResult(
s=row.s, p=row.p, o=row.o, g=d,
otype=row.otype, dtype=row.dtype, lang=row.lang
))
return results
def get_sp(self, collection, s, p, g=None, limit=10):
"""Query by subject and predicate"""
rows = self.session.execute(self.get_entity_as_s_p_stmt, (collection, s, p, limit))
results = []
for row in rows:
d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH
if g is None or g == DEFAULT_GRAPH:
if d != DEFAULT_GRAPH:
continue
elif g != GRAPH_WILDCARD and d != g:
continue
results.append(QuadResult(
s=s, p=p, o=row.o, g=d,
otype=row.otype, dtype=row.dtype, lang=row.lang
))
return results
def get_po(self, collection, p, o, g=None, limit=10):
"""Query by predicate and object"""
rows = self.session.execute(self.get_entity_as_o_p_stmt, (collection, o, p, limit))
results = []
for row in rows:
d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH
if g is None or g == DEFAULT_GRAPH:
if d != DEFAULT_GRAPH:
continue
elif g != GRAPH_WILDCARD and d != g:
continue
results.append(QuadResult(
s=row.s, p=p, o=o, g=d,
otype=row.otype, dtype=row.dtype, lang=row.lang
))
return results
def get_os(self, collection, o, s, g=None, limit=10):
"""Query by object and subject"""
# Use subject partition with role='S', filter by o
rows = self.session.execute(self.get_entity_as_s_stmt, (collection, s, limit))
results = []
for row in rows:
if row.o != o:
continue
d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH
if g is None or g == DEFAULT_GRAPH:
if d != DEFAULT_GRAPH:
continue
elif g != GRAPH_WILDCARD and d != g:
continue
results.append(QuadResult(
s=s, p=row.p, o=o, g=d,
otype=row.otype, dtype=row.dtype, lang=row.lang
))
return results
def get_spo(self, collection, s, p, o, g=None, limit=10):
"""Query by subject, predicate, object (find which graphs)"""
rows = self.session.execute(self.get_entity_as_s_p_stmt, (collection, s, p, limit))
results = []
for row in rows:
if row.o != o:
continue
d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH
if g is None or g == DEFAULT_GRAPH:
if d != DEFAULT_GRAPH:
continue
elif g != GRAPH_WILDCARD and d != g:
continue
results.append(QuadResult(
s=s, p=p, o=o, g=d,
otype=row.otype, dtype=row.dtype, lang=row.lang
))
return results
def get_g(self, collection, g, limit=50):
"""Get all quads in a specific graph"""
if g is None:
g = DEFAULT_GRAPH
return self.session.execute(self.get_collection_by_graph_stmt, (collection, g, limit))
# ========================================================================
# Collection management
# ========================================================================
def collection_exists(self, collection):
"""Check if collection exists"""
try:
result = self.session.execute(
f"SELECT collection FROM {self.collection_metadata_table} WHERE collection = %s LIMIT 1",
(collection,)
)
return bool(list(result))
except Exception as e:
logger.error(f"Error checking collection existence: {e}")
return False
def create_collection(self, collection):
"""Create collection by inserting metadata row"""
try:
import datetime
self.session.execute(
f"INSERT INTO {self.collection_metadata_table} (collection, created_at) VALUES (%s, %s)",
(collection, datetime.datetime.now())
)
logger.info(f"Created collection metadata for {collection}")
except Exception as e:
logger.error(f"Error creating collection: {e}")
raise e
def delete_collection(self, collection):
"""
Delete all quads for a collection from both tables.
Uses efficient partition-level deletes:
1. Read quads from quads_by_collection to get all quads
2. Extract unique entities (s, p, o for URIs, g for non-default)
3. Delete entire entity partitions
4. Delete collection rows
"""
# Read all quads from collection table
rows = self.session.execute(
f"SELECT d, s, p, o, otype FROM {self.collection_table} WHERE collection = %s",
(collection,)
)
# Collect unique entities and quad data for deletion
entities = set()
quads = []
for row in rows:
d, s, p, o, otype = row.d, row.s, row.p, row.o, row.otype
quads.append((d, s, p, o))
# Subject and predicate are always entities
entities.add(s)
entities.add(p)
# Object is an entity only for URIs
if otype == 'u' or otype == 't':
entities.add(o)
# Graph is an entity for non-default graphs
if d != DEFAULT_GRAPH:
entities.add(d)
# Delete entity partitions (efficient partition-level deletes)
batch = BatchStatement()
count = 0
for entity in entities:
batch.add(self.delete_entity_partition_stmt, (collection, entity))
count += 1
# Execute batch every 50 entities
if count % 50 == 0:
self.session.execute(batch)
batch = BatchStatement()
# Execute remaining entity deletes
if count % 50 != 0:
self.session.execute(batch)
# Delete collection rows
batch = BatchStatement()
count = 0
for d, s, p, o in quads:
batch.add(self.delete_collection_row_stmt, (collection, d, s, p, o))
count += 1
# Execute batch every 50 quads
if count % 50 == 0:
self.session.execute(batch)
batch = BatchStatement()
# Execute remaining collection row deletes
if count % 50 != 0:
self.session.execute(batch)
# Delete collection metadata
self.session.execute(
f"DELETE FROM {self.collection_metadata_table} WHERE collection = %s",
(collection,)
)
logger.info(f"Deleted collection {collection}: {len(entities)} entity partitions, {len(quads)} quads")
def close(self):
"""Close connections"""
if hasattr(self, 'session') and self.session:
self.session.shutdown()
if hasattr(self, 'cluster') and self.cluster:
self.cluster.shutdown()
if self.cluster in _active_clusters:
_active_clusters.remove(self.cluster)
class QuadResult:
"""
Result object for quad queries, including object type metadata.
Attributes:
s: Subject value
p: Predicate value
o: Object value
g: Graph/dataset value
otype: Object type - 'u' (URI), 'l' (literal), 't' (triple)
dtype: XSD datatype (for literals)
lang: Language tag (for literals)
"""
def __init__(self, s, p, o, g, otype='u', dtype='', lang=''):
self.s = s
self.p = p
self.o = o
self.g = g
self.otype = otype
self.dtype = dtype
self.lang = lang

View file

@ -6,7 +6,9 @@ null. Output is a list of quads.
import logging
from .... direct.cassandra_kg import KnowledgeGraph, GRAPH_WILDCARD, DEFAULT_GRAPH
from .... direct.cassandra_kg import (
EntityCentricKnowledgeGraph, GRAPH_WILDCARD, DEFAULT_GRAPH
)
from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error
from .... schema import Term, Triple, IRI, LITERAL
from .... base import TriplesQueryService
@ -31,8 +33,37 @@ def get_term_value(term):
return term.id or term.value
def create_term(value):
"""Create a Term from a string value"""
def create_term(value, otype=None, dtype=None, lang=None):
"""
Create a Term from a string value, optionally using type metadata.
Args:
value: The string value
otype: Object type - 'u' (URI), 'l' (literal), 't' (triple)
dtype: XSD datatype (for literals)
lang: Language tag (for literals)
If otype is provided, uses it to determine Term type.
Otherwise falls back to URL detection heuristic.
"""
if otype is not None:
if otype == 'u':
return Term(type=IRI, iri=value)
elif otype == 'l':
return Term(
type=LITERAL,
value=value,
datatype=dtype or "",
language=lang or ""
)
elif otype == 't':
# Triple/reification - treat as IRI for now
return Term(type=IRI, iri=value)
else:
# Unknown otype, fall back to heuristic
pass
# Heuristic fallback for backwards compatibility
if value.startswith("http://") or value.startswith("https://"):
return Term(type=IRI, iri=value)
else:
@ -74,14 +105,17 @@ class Processor(TriplesQueryService):
user = query.user
if user != self.table:
# Use factory function to select implementation
KGClass = EntityCentricKnowledgeGraph
if self.cassandra_username and self.cassandra_password:
self.tg = KnowledgeGraph(
self.tg = KGClass(
hosts=self.cassandra_host,
keyspace=query.user,
username=self.cassandra_username, password=self.cassandra_password
)
else:
self.tg = KnowledgeGraph(
self.tg = KGClass(
hosts=self.cassandra_host,
keyspace=query.user,
)
@ -93,6 +127,14 @@ class Processor(TriplesQueryService):
o_val = get_term_value(query.o)
g_val = query.g # Already a string or None
# Helper to extract object metadata from result row
def get_o_metadata(t):
"""Extract otype/dtype/lang from result row if available"""
otype = getattr(t, 'otype', None)
dtype = getattr(t, 'dtype', None)
lang = getattr(t, 'lang', None)
return otype, dtype, lang
quads = []
# Route to appropriate query method based on which fields are specified
@ -106,7 +148,8 @@ class Processor(TriplesQueryService):
)
for t in resp:
g = t.g if hasattr(t, 'g') else DEFAULT_GRAPH
quads.append((s_val, p_val, o_val, g))
otype, dtype, lang = get_o_metadata(t)
quads.append((s_val, p_val, o_val, g, otype, dtype, lang))
else:
# SP specified
resp = self.tg.get_sp(
@ -115,7 +158,8 @@ class Processor(TriplesQueryService):
)
for t in resp:
g = t.g if hasattr(t, 'g') else DEFAULT_GRAPH
quads.append((s_val, p_val, t.o, g))
otype, dtype, lang = get_o_metadata(t)
quads.append((s_val, p_val, t.o, g, otype, dtype, lang))
else:
if o_val is not None:
# SO specified
@ -125,7 +169,8 @@ class Processor(TriplesQueryService):
)
for t in resp:
g = t.g if hasattr(t, 'g') else DEFAULT_GRAPH
quads.append((s_val, t.p, o_val, g))
otype, dtype, lang = get_o_metadata(t)
quads.append((s_val, t.p, o_val, g, otype, dtype, lang))
else:
# S only
resp = self.tg.get_s(
@ -134,7 +179,8 @@ class Processor(TriplesQueryService):
)
for t in resp:
g = t.g if hasattr(t, 'g') else DEFAULT_GRAPH
quads.append((s_val, t.p, t.o, g))
otype, dtype, lang = get_o_metadata(t)
quads.append((s_val, t.p, t.o, g, otype, dtype, lang))
else:
if p_val is not None:
if o_val is not None:
@ -145,7 +191,8 @@ class Processor(TriplesQueryService):
)
for t in resp:
g = t.g if hasattr(t, 'g') else DEFAULT_GRAPH
quads.append((t.s, p_val, o_val, g))
otype, dtype, lang = get_o_metadata(t)
quads.append((t.s, p_val, o_val, g, otype, dtype, lang))
else:
# P only
resp = self.tg.get_p(
@ -154,7 +201,8 @@ class Processor(TriplesQueryService):
)
for t in resp:
g = t.g if hasattr(t, 'g') else DEFAULT_GRAPH
quads.append((t.s, p_val, t.o, g))
otype, dtype, lang = get_o_metadata(t)
quads.append((t.s, p_val, t.o, g, otype, dtype, lang))
else:
if o_val is not None:
# O only
@ -164,7 +212,8 @@ class Processor(TriplesQueryService):
)
for t in resp:
g = t.g if hasattr(t, 'g') else DEFAULT_GRAPH
quads.append((t.s, t.p, o_val, g))
otype, dtype, lang = get_o_metadata(t)
quads.append((t.s, t.p, o_val, g, otype, dtype, lang))
else:
# Nothing specified - get all
resp = self.tg.get_all(
@ -173,14 +222,16 @@ class Processor(TriplesQueryService):
)
for t in resp:
g = t.g if hasattr(t, 'g') else DEFAULT_GRAPH
quads.append((t.s, t.p, t.o, g))
otype, dtype, lang = get_o_metadata(t)
quads.append((t.s, t.p, t.o, g, otype, dtype, lang))
# Convert to Triple objects (with g field)
# Use otype/dtype/lang for proper Term reconstruction if available
triples = [
Triple(
s=create_term(q[0]),
p=create_term(q[1]),
o=create_term(q[2]),
o=create_term(q[2], otype=q[4], dtype=q[5], lang=q[6]),
g=q[3] if q[3] != DEFAULT_GRAPH else None
)
for q in quads

View file

@ -10,12 +10,14 @@ import argparse
import time
import logging
from .... direct.cassandra_kg import KnowledgeGraph, DEFAULT_GRAPH
from .... direct.cassandra_kg import (
EntityCentricKnowledgeGraph, DEFAULT_GRAPH
)
from .... base import TriplesStoreService, CollectionConfigHandler
from .... base import AsyncProcessor, Consumer, Producer
from .... base import ConsumerMetrics, ProducerMetrics
from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config
from .... schema import IRI, LITERAL
from .... schema import IRI, LITERAL, BLANK, TRIPLE
# Module logger
logger = logging.getLogger(__name__)
@ -36,6 +38,46 @@ def get_term_value(term):
return term.id or term.value
def get_term_otype(term):
"""
Get object type code from a Term for entity-centric storage.
Maps Term.type to otype:
- IRI ("i") "u" (URI)
- BLANK ("b") "u" (treated as URI)
- LITERAL ("l") "l" (Literal)
- TRIPLE ("t") "t" (Triple/reification)
"""
if term is None:
return "u"
if term.type == IRI or term.type == BLANK:
return "u"
elif term.type == LITERAL:
return "l"
elif term.type == TRIPLE:
return "t"
else:
return "u"
def get_term_dtype(term):
"""Extract datatype from a Term (for literals)"""
if term is None:
return ""
if term.type == LITERAL:
return term.datatype or ""
return ""
def get_term_lang(term):
"""Extract language tag from a Term (for literals)"""
if term is None:
return ""
if term.type == LITERAL:
return term.language or ""
return ""
class Processor(CollectionConfigHandler, TriplesStoreService):
def __init__(self, **params):
@ -78,15 +120,18 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
self.tg = None
# Use factory function to select implementation
KGClass = EntityCentricKnowledgeGraph
try:
if self.cassandra_username and self.cassandra_password:
self.tg = KnowledgeGraph(
self.tg = KGClass(
hosts=self.cassandra_host,
keyspace=message.metadata.user,
username=self.cassandra_username, password=self.cassandra_password
)
else:
self.tg = KnowledgeGraph(
self.tg = KGClass(
hosts=self.cassandra_host,
keyspace=message.metadata.user,
)
@ -105,12 +150,20 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
# t.g is None for default graph, or a graph IRI
g_val = t.g if t.g is not None else DEFAULT_GRAPH
# Extract object type metadata for entity-centric storage
otype = get_term_otype(t.o)
dtype = get_term_dtype(t.o)
lang = get_term_lang(t.o)
self.tg.insert(
message.metadata.collection,
s_val,
p_val,
o_val,
g=g_val
g=g_val,
otype=otype,
dtype=dtype,
lang=lang
)
async def create_collection(self, user: str, collection: str, metadata: dict):
@ -120,16 +173,19 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
if self.table is None or self.table != user:
self.tg = None
# Use factory function to select implementation
KGClass = EntityCentricKnowledgeGraph
try:
if self.cassandra_username and self.cassandra_password:
self.tg = KnowledgeGraph(
self.tg = KGClass(
hosts=self.cassandra_host,
keyspace=user,
username=self.cassandra_username,
password=self.cassandra_password
)
else:
self.tg = KnowledgeGraph(
self.tg = KGClass(
hosts=self.cassandra_host,
keyspace=user,
)
@ -159,16 +215,19 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
if self.table is None or self.table != user:
self.tg = None
# Use factory function to select implementation
KGClass = EntityCentricKnowledgeGraph
try:
if self.cassandra_username and self.cassandra_password:
self.tg = KnowledgeGraph(
self.tg = KGClass(
hosts=self.cassandra_host,
keyspace=user,
username=self.cassandra_username,
password=self.cassandra_password
)
else:
self.tg = KnowledgeGraph(
self.tg = KGClass(
hosts=self.cassandra_host,
keyspace=user,
)