diff --git a/docs/tech-specs/entity-centric-graph.md b/docs/tech-specs/entity-centric-graph.md index aa695811..33f500f6 100644 --- a/docs/tech-specs/entity-centric-graph.md +++ b/docs/tech-specs/entity-centric-graph.md @@ -42,7 +42,7 @@ CREATE TABLE quads_by_entity ( d text, -- Dataset/graph of the quad dtype text, -- XSD datatype (when otype = 'L'), e.g. 'xsd:string' lang text, -- Language tag (when otype = 'L'), e.g. 'en', 'fr' - PRIMARY KEY ((collection, entity), role, p, otype, s, o, d) + PRIMARY KEY ((collection, entity), role, p, otype, s, o, d, dtype, lang) ); ``` @@ -54,6 +54,7 @@ CREATE TABLE quads_by_entity ( 2. **p** — next most common filter, "give me all `knows` relationships" 3. **otype** — enables filtering by URI-valued vs literal-valued relationships 4. **s, o, d** — remaining columns for uniqueness +5. **dtype, lang** — distinguish literals with same value but different type metadata (e.g., `"thing"` vs `"thing"@en` vs `"thing"^^xsd:string`) ### Table 2: quads_by_collection @@ -69,11 +70,11 @@ CREATE TABLE quads_by_collection ( otype text, -- 'U' (URI), 'L' (literal), 'T' (triple/reification) dtype text, -- XSD datatype (when otype = 'L') lang text, -- Language tag (when otype = 'L') - PRIMARY KEY (collection, d, s, p, o) + PRIMARY KEY (collection, d, s, p, o, otype, dtype, lang) ); ``` -Clustered by dataset first, enabling deletion at either collection or dataset granularity. +Clustered by dataset first, enabling deletion at either collection or dataset granularity. The `otype`, `dtype`, and `lang` columns are included in the clustering key to distinguish literals with the same value but different type metadata — in RDF, `"thing"`, `"thing"@en`, and `"thing"^^xsd:string` are semantically distinct values. ## Write Path diff --git a/tests/unit/test_direct/test_entity_centric_kg.py b/tests/unit/test_direct/test_entity_centric_kg.py index 72c66a42..4a74b35b 100644 --- a/tests/unit/test_direct/test_entity_centric_kg.py +++ b/tests/unit/test_direct/test_entity_centric_kg.py @@ -305,9 +305,8 @@ class TestEntityCentricKnowledgeGraph: mock_session.execute.assert_called() - def test_graph_wildcard_returns_all_graphs(self, entity_kg): - """Test that g='*' returns quads from all graphs""" - from trustgraph.direct.cassandra_kg import GRAPH_WILDCARD + def test_graph_none_returns_all_graphs(self, entity_kg): + """Test that g=None returns quads from all graphs""" kg, mock_session = entity_kg mock_result = [ @@ -320,7 +319,7 @@ class TestEntityCentricKnowledgeGraph: ] mock_session.execute.return_value = mock_result - results = kg.get_s('test_collection', 'http://example.org/Alice', g=GRAPH_WILDCARD) + results = kg.get_s('test_collection', 'http://example.org/Alice', g=None) # Should return quads from both graphs assert len(results) == 2 diff --git a/trustgraph-cli/trustgraph/cli/query_graph.py b/trustgraph-cli/trustgraph/cli/query_graph.py index feb81691..a123e632 100644 --- a/trustgraph-cli/trustgraph/cli/query_graph.py +++ b/trustgraph-cli/trustgraph/cli/query_graph.py @@ -186,6 +186,12 @@ def build_quoted_triple_term(qt_subject, qt_subject_type, def format_term(term_dict): """Format a term dict for display in space/pipe output formats. + Handles multiple wire format styles: + - Short form (send): {"t": "i", "i": "..."}, {"t": "l", "v": "..."} + - Long form (receive): {"type": "i", "iri": "..."}, {"type": "l", "value": "..."} + - Raw quoted triple: {"s": {...}, "p": {...}, "o": {...}} (no type wrapper) + - Stringified quoted triple in IRI: {"t": "i", "i": "{\"s\":...}"} (backend quirk) + Args: term_dict: Wire-format term dict @@ -195,25 +201,53 @@ def format_term(term_dict): if not term_dict: return "" - t = term_dict.get("t") + # Get type - handle both short and long form + t = term_dict.get("t") or term_dict.get("type") + if t == "i": - return term_dict.get("i", "") + # IRI - handle both "i" and "iri" keys + iri_value = term_dict.get("i") or term_dict.get("iri", "") + # Check if IRI value is actually a stringified quoted triple (backend quirk) + if iri_value.startswith('{"s":') or iri_value.startswith("{\"s\":"): + try: + parsed = json.loads(iri_value) + if "s" in parsed and "p" in parsed and "o" in parsed: + # It's a stringified quoted triple - format it properly + s = format_term(parsed.get("s", {})) + p = format_term(parsed.get("p", {})) + o = format_term(parsed.get("o", {})) + return f"<<{s} {p} {o}>>" + except json.JSONDecodeError: + pass # Not valid JSON, treat as regular IRI + return iri_value elif t == "l": - value = term_dict.get("v", "") - # Quote literals and show language/datatype if present + # Literal - handle both short and long form keys + value = term_dict.get("v") or term_dict.get("value", "") result = f'"{value}"' - if "ln" in term_dict: - result += f'@{term_dict["ln"]}' - elif "dt" in term_dict: - result += f'^^{term_dict["dt"]}' + # Language tag + lang = term_dict.get("ln") or term_dict.get("language") + if lang: + result += f'@{lang}' + else: + # Datatype + dt = term_dict.get("dt") or term_dict.get("datatype") + if dt: + result += f'^^{dt}' return result elif t == "t": - # Format quoted triple as <> - tr = term_dict.get("tr", {}) + # Quoted triple - handle both "tr" and "triple" keys + tr = term_dict.get("tr") or term_dict.get("triple", {}) s = format_term(tr.get("s", {})) p = format_term(tr.get("p", {})) o = format_term(tr.get("o", {})) return f"<<{s} {p} {o}>>" + elif t is None and "s" in term_dict and "p" in term_dict and "o" in term_dict: + # Raw quoted triple without type wrapper (has s, p, o keys directly) + s = format_term(term_dict.get("s", {})) + p = format_term(term_dict.get("p", {})) + o = format_term(term_dict.get("o", {})) + return f"<<{s} {p} {o}>>" + return str(term_dict) @@ -526,8 +560,9 @@ def main(): else: obj_term = None - # Graph is always an IRI - graph_term = build_term(args.graph, term_type='iri') if args.graph else None + # Graph is a plain IRI string, not a Term + # None = all graphs, "" = default graph only, "uri" = specific graph + graph_value = args.graph query_graph( url=args.api_url, @@ -539,7 +574,7 @@ def main(): subject=subject_term, predicate=predicate_term, obj=obj_term, - graph=graph_term, + graph=graph_value, output_format=args.format, headers=args.headers, token=args.token, diff --git a/trustgraph-flow/trustgraph/direct/cassandra_kg.py b/trustgraph-flow/trustgraph/direct/cassandra_kg.py index 61639096..59d2a2a1 100644 --- a/trustgraph-flow/trustgraph/direct/cassandra_kg.py +++ b/trustgraph-flow/trustgraph/direct/cassandra_kg.py @@ -589,6 +589,8 @@ class EntityCentricKnowledgeGraph: # quads_by_entity: primary data table # Every entity has a partition containing all quads it participates in + # Clustering key includes dtype/lang to distinguish literals with same value + # but different datatype or language tag (e.g., "thing" vs "thing"@en) self.session.execute(f""" CREATE TABLE IF NOT EXISTS {self.entity_table} ( collection text, @@ -601,11 +603,13 @@ class EntityCentricKnowledgeGraph: d text, dtype text, lang text, - PRIMARY KEY ((collection, entity), role, p, otype, s, o, d) + PRIMARY KEY ((collection, entity), role, p, otype, s, o, d, dtype, lang) ); """) # quads_by_collection: manifest for collection-level queries and deletion + # Clustering key includes otype/dtype/lang to distinguish literals with same + # value but different metadata (e.g., "thing" vs "thing"@en vs "thing"^^xsd:string) self.session.execute(f""" CREATE TABLE IF NOT EXISTS {self.collection_table} ( collection text, @@ -616,7 +620,7 @@ class EntityCentricKnowledgeGraph: otype text, dtype text, lang text, - PRIMARY KEY (collection, d, s, p, o) + PRIMARY KEY (collection, d, s, p, o, otype, dtype, lang) ); """) @@ -718,7 +722,7 @@ class EntityCentricKnowledgeGraph: ) self.delete_collection_row_stmt = self.session.prepare( - f"DELETE FROM {self.collection_table} WHERE collection = ? AND d = ? AND s = ? AND p = ? AND o = ?" + f"DELETE FROM {self.collection_table} WHERE collection = ? AND d = ? AND s = ? AND p = ? AND o = ? AND otype = ? AND dtype = ? AND lang = ?" ) logger.info("Prepared statements initialized for entity-centric schema") @@ -797,7 +801,7 @@ class EntityCentricKnowledgeGraph: def get_s(self, collection, s, g=None, limit=10): """ Query by subject. Returns quads where s is the subject. - g=None: default graph, g='*': all graphs + g=None: all graphs, g='': default graph only, g='uri': specific graph """ rows = self.session.execute(self.get_entity_as_s_stmt, (collection, s, limit)) @@ -805,10 +809,7 @@ class EntityCentricKnowledgeGraph: for row in rows: d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH # Filter by graph if specified - if g is None or g == DEFAULT_GRAPH: - if d != DEFAULT_GRAPH: - continue - elif g != GRAPH_WILDCARD and d != g: + if g is not None and d != g: continue results.append(QuadResult( @@ -819,16 +820,13 @@ class EntityCentricKnowledgeGraph: return results def get_p(self, collection, p, g=None, limit=10): - """Query by predicate""" + """Query by predicate. g=None: all graphs, g='': default graph only""" rows = self.session.execute(self.get_entity_as_p_stmt, (collection, p, limit)) results = [] for row in rows: d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH - if g is None or g == DEFAULT_GRAPH: - if d != DEFAULT_GRAPH: - continue - elif g != GRAPH_WILDCARD and d != g: + if g is not None and d != g: continue results.append(QuadResult( @@ -839,16 +837,13 @@ class EntityCentricKnowledgeGraph: return results def get_o(self, collection, o, g=None, limit=10): - """Query by object""" + """Query by object. g=None: all graphs, g='': default graph only""" rows = self.session.execute(self.get_entity_as_o_stmt, (collection, o, limit)) results = [] for row in rows: d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH - if g is None or g == DEFAULT_GRAPH: - if d != DEFAULT_GRAPH: - continue - elif g != GRAPH_WILDCARD and d != g: + if g is not None and d != g: continue results.append(QuadResult( @@ -859,16 +854,13 @@ class EntityCentricKnowledgeGraph: return results def get_sp(self, collection, s, p, g=None, limit=10): - """Query by subject and predicate""" + """Query by subject and predicate. g=None: all graphs, g='': default graph only""" rows = self.session.execute(self.get_entity_as_s_p_stmt, (collection, s, p, limit)) results = [] for row in rows: d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH - if g is None or g == DEFAULT_GRAPH: - if d != DEFAULT_GRAPH: - continue - elif g != GRAPH_WILDCARD and d != g: + if g is not None and d != g: continue results.append(QuadResult( @@ -879,16 +871,13 @@ class EntityCentricKnowledgeGraph: return results def get_po(self, collection, p, o, g=None, limit=10): - """Query by predicate and object""" + """Query by predicate and object. g=None: all graphs, g='': default graph only""" rows = self.session.execute(self.get_entity_as_o_p_stmt, (collection, o, p, limit)) results = [] for row in rows: d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH - if g is None or g == DEFAULT_GRAPH: - if d != DEFAULT_GRAPH: - continue - elif g != GRAPH_WILDCARD and d != g: + if g is not None and d != g: continue results.append(QuadResult( @@ -899,7 +888,7 @@ class EntityCentricKnowledgeGraph: return results def get_os(self, collection, o, s, g=None, limit=10): - """Query by object and subject""" + """Query by object and subject. g=None: all graphs, g='': default graph only""" # Use subject partition with role='S', filter by o rows = self.session.execute(self.get_entity_as_s_stmt, (collection, s, limit)) @@ -909,10 +898,7 @@ class EntityCentricKnowledgeGraph: continue d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH - if g is None or g == DEFAULT_GRAPH: - if d != DEFAULT_GRAPH: - continue - elif g != GRAPH_WILDCARD and d != g: + if g is not None and d != g: continue results.append(QuadResult( @@ -923,7 +909,7 @@ class EntityCentricKnowledgeGraph: return results def get_spo(self, collection, s, p, o, g=None, limit=10): - """Query by subject, predicate, object (find which graphs)""" + """Query by subject, predicate, object (find which graphs). g=None: all graphs, g='': default graph only""" rows = self.session.execute(self.get_entity_as_s_p_stmt, (collection, s, p, limit)) results = [] @@ -932,10 +918,7 @@ class EntityCentricKnowledgeGraph: continue d = row.d if hasattr(row, 'd') else DEFAULT_GRAPH - if g is None or g == DEFAULT_GRAPH: - if d != DEFAULT_GRAPH: - continue - elif g != GRAPH_WILDCARD and d != g: + if g is not None and d != g: continue results.append(QuadResult( @@ -991,9 +974,9 @@ class EntityCentricKnowledgeGraph: 3. Delete entire entity partitions 4. Delete collection rows """ - # Read all quads from collection table + # Read all quads from collection table (including type metadata for delete) rows = self.session.execute( - f"SELECT d, s, p, o, otype FROM {self.collection_table} WHERE collection = %s", + f"SELECT d, s, p, o, otype, dtype, lang FROM {self.collection_table} WHERE collection = %s", (collection,) ) @@ -1002,8 +985,11 @@ class EntityCentricKnowledgeGraph: quads = [] for row in rows: - d, s, p, o, otype = row.d, row.s, row.p, row.o, row.otype - quads.append((d, s, p, o)) + d, s, p, o = row.d, row.s, row.p, row.o + otype = row.otype + dtype = row.dtype if hasattr(row, 'dtype') else '' + lang = row.lang if hasattr(row, 'lang') else '' + quads.append((d, s, p, o, otype, dtype, lang)) # Subject and predicate are always entities entities.add(s) @@ -1038,8 +1024,8 @@ class EntityCentricKnowledgeGraph: batch = BatchStatement() count = 0 - for d, s, p, o in quads: - batch.add(self.delete_collection_row_stmt, (collection, d, s, p, o)) + for d, s, p, o, otype, dtype, lang in quads: + batch.add(self.delete_collection_row_stmt, (collection, d, s, p, o, otype, dtype, lang)) count += 1 # Execute batch every 50 quads diff --git a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py index 1bb88f21..f1f5ba60 100755 --- a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py +++ b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py @@ -10,7 +10,7 @@ import json from cassandra.query import SimpleStatement from .... direct.cassandra_kg import ( - EntityCentricKnowledgeGraph, GRAPH_WILDCARD, DEFAULT_GRAPH + EntityCentricKnowledgeGraph, DEFAULT_GRAPH ) from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error from .... schema import Term, Triple, IRI, LITERAL, TRIPLE, BLANK @@ -304,6 +304,13 @@ class Processor(TriplesQueryService): for t in resp: # Note: quads_by_collection uses 'd' for graph field g = t.d if hasattr(t, 'd') else DEFAULT_GRAPH + # Filter by graph + # g_val=None means all graphs (no filter) + # g_val="" means default graph only + # otherwise filter to specific named graph + if g_val is not None: + if g != g_val: + continue term_type, datatype, language = get_object_metadata(t) quads.append((t.s, t.p, t.o, g, term_type, datatype, language)) @@ -379,6 +386,15 @@ class Processor(TriplesQueryService): break g = row.d if hasattr(row, 'd') else DEFAULT_GRAPH + + # Filter by graph + # g_val=None means all graphs (no filter) + # g_val="" means default graph only + # otherwise filter to specific named graph + if g_val is not None: + if g != g_val: + continue + term_type, datatype, language = get_object_metadata(row) # s and p are always IRIs in RDF