diff --git a/trustgraph-base/trustgraph/api/flow.py b/trustgraph-base/trustgraph/api/flow.py index cc07f794..0d34104c 100644 --- a/trustgraph-base/trustgraph/api/flow.py +++ b/trustgraph-base/trustgraph/api/flow.py @@ -9,26 +9,45 @@ including LLM operations, RAG queries, knowledge graph management, and more. import json import base64 -from .. knowledge import hash, Uri, Literal -from .. schema import IRI, LITERAL +from .. knowledge import hash, Uri, Literal, QuotedTriple +from .. schema import IRI, LITERAL, TRIPLE from . types import Triple from . exceptions import ProtocolException def to_value(x): - """Convert wire format to Uri or Literal.""" + """Convert wire format to Uri, Literal, or QuotedTriple.""" if x.get("t") == IRI: return Uri(x.get("i", "")) elif x.get("t") == LITERAL: return Literal(x.get("v", "")) + elif x.get("t") == TRIPLE: + # Wire format uses "tr" key for nested triple dict + triple_data = x.get("tr") + if triple_data: + return QuotedTriple( + s=to_value(triple_data.get("s", {})), + p=to_value(triple_data.get("p", {})), + o=to_value(triple_data.get("o", {})), + ) + return Literal("") # Fallback for any other type return Literal(x.get("v", x.get("i", ""))) def from_value(v): - """Convert Uri or Literal to wire format.""" + """Convert Uri, Literal, or QuotedTriple to wire format.""" if isinstance(v, Uri): return {"t": IRI, "i": str(v)} + elif isinstance(v, QuotedTriple): + return { + "t": TRIPLE, + "tr": { + "s": from_value(v.s), + "p": from_value(v.p), + "o": from_value(v.o), + } + } else: return {"t": LITERAL, "v": str(v)} diff --git a/trustgraph-base/trustgraph/api/knowledge.py b/trustgraph-base/trustgraph/api/knowledge.py index 1fae350c..84f98918 100644 --- a/trustgraph-base/trustgraph/api/knowledge.py +++ b/trustgraph-base/trustgraph/api/knowledge.py @@ -9,17 +9,27 @@ into flows for use in queries and RAG operations. import json import base64 -from .. knowledge import hash, Uri, Literal -from .. schema import IRI, LITERAL +from .. knowledge import hash, Uri, Literal, QuotedTriple +from .. schema import IRI, LITERAL, TRIPLE from . types import Triple def to_value(x): - """Convert wire format to Uri or Literal.""" + """Convert wire format to Uri, Literal, or QuotedTriple.""" if x.get("t") == IRI: return Uri(x.get("i", "")) elif x.get("t") == LITERAL: return Literal(x.get("v", "")) + elif x.get("t") == TRIPLE: + # Wire format uses "tr" key for nested triple dict + triple_data = x.get("tr") + if triple_data: + return QuotedTriple( + s=to_value(triple_data.get("s", {})), + p=to_value(triple_data.get("p", {})), + o=to_value(triple_data.get("o", {})), + ) + return Literal("") # Fallback for any other type return Literal(x.get("v", x.get("i", ""))) diff --git a/trustgraph-base/trustgraph/api/library.py b/trustgraph-base/trustgraph/api/library.py index b61348fc..396d64e0 100644 --- a/trustgraph-base/trustgraph/api/library.py +++ b/trustgraph-base/trustgraph/api/library.py @@ -12,8 +12,8 @@ import base64 import logging from . types import DocumentMetadata, ProcessingMetadata, Triple -from .. knowledge import hash, Uri, Literal -from .. schema import IRI, LITERAL +from .. knowledge import hash, Uri, Literal, QuotedTriple +from .. schema import IRI, LITERAL, TRIPLE from . exceptions import * logger = logging.getLogger(__name__) @@ -27,19 +27,38 @@ DEFAULT_CHUNK_SIZE = 5 * 1024 * 1024 def to_value(x): - """Convert wire format to Uri or Literal.""" + """Convert wire format to Uri, Literal, or QuotedTriple.""" if x.get("t") == IRI: return Uri(x.get("i", "")) elif x.get("t") == LITERAL: return Literal(x.get("v", "")) + elif x.get("t") == TRIPLE: + # Wire format uses "tr" key for nested triple dict + triple_data = x.get("tr") + if triple_data: + return QuotedTriple( + s=to_value(triple_data.get("s", {})), + p=to_value(triple_data.get("p", {})), + o=to_value(triple_data.get("o", {})), + ) + return Literal("") # Fallback for any other type return Literal(x.get("v", x.get("i", ""))) def from_value(v): - """Convert Uri or Literal to wire format.""" + """Convert Uri, Literal, or QuotedTriple to wire format.""" if isinstance(v, Uri): return {"t": IRI, "i": str(v)} + elif isinstance(v, QuotedTriple): + return { + "t": TRIPLE, + "tr": { + "s": from_value(v.s), + "p": from_value(v.p), + "o": from_value(v.o), + } + } else: return {"t": LITERAL, "v": str(v)} diff --git a/trustgraph-base/trustgraph/base/pulsar_backend.py b/trustgraph-base/trustgraph/base/pulsar_backend.py index c6248622..a3c3debd 100644 --- a/trustgraph-base/trustgraph/base/pulsar_backend.py +++ b/trustgraph-base/trustgraph/base/pulsar_backend.py @@ -12,7 +12,7 @@ import logging import base64 import types from dataclasses import asdict, is_dataclass -from typing import Any +from typing import Any, get_type_hints from .backend import PubSubBackend, BackendProducer, BackendConsumer, Message @@ -58,6 +58,7 @@ def dict_to_dataclass(data: dict, cls: type) -> Any: Convert a dictionary back to a dataclass instance. Handles nested dataclasses and missing fields. + Uses get_type_hints() to resolve forward references (string annotations). """ if data is None: return None @@ -65,8 +66,13 @@ def dict_to_dataclass(data: dict, cls: type) -> Any: if not is_dataclass(cls): return data - # Get field types from the dataclass - field_types = {f.name: f.type for f in cls.__dataclass_fields__.values()} + # Get field types from the dataclass, resolving forward references + # get_type_hints() evaluates string annotations like "Triple | None" + try: + field_types = get_type_hints(cls) + except Exception: + # Fallback if get_type_hints fails (shouldn't happen normally) + field_types = {f.name: f.type for f in cls.__dataclass_fields__.values()} kwargs = {} for key, value in data.items(): diff --git a/trustgraph-base/trustgraph/knowledge/defs.py b/trustgraph-base/trustgraph/knowledge/defs.py index d6290930..e8157586 100644 --- a/trustgraph-base/trustgraph/knowledge/defs.py +++ b/trustgraph-base/trustgraph/knowledge/defs.py @@ -26,8 +26,40 @@ KEYWORD = 'https://schema.org/keywords' class Uri(str): def is_uri(self): return True def is_literal(self): return False + def is_triple(self): return False class Literal(str): def is_uri(self): return False def is_literal(self): return True + def is_triple(self): return False + +class QuotedTriple: + """ + RDF-star quoted triple (reification). + + Represents a triple that can be used as the object of another triple, + enabling statements about statements. + + Example: + # stmt:123 tg:reifies <<:Hope skos:definition "A feeling...">> + qt = QuotedTriple( + s=Uri("https://example.org/Hope"), + p=Uri("http://www.w3.org/2004/02/skos/core#definition"), + o=Literal("A feeling of expectation") + ) + """ + def __init__(self, s, p, o): + self.s = s # Uri, Literal, or QuotedTriple + self.p = p # Uri + self.o = o # Uri, Literal, or QuotedTriple + + def is_uri(self): return False + def is_literal(self): return False + def is_triple(self): return True + + def __repr__(self): + return f"<<{self.s} {self.p} {self.o}>>" + + def __str__(self): + return f"<<{self.s} {self.p} {self.o}>>" diff --git a/trustgraph-base/trustgraph/messaging/translators/primitives.py b/trustgraph-base/trustgraph/messaging/translators/primitives.py index 790ae8f7..d54efc49 100644 --- a/trustgraph-base/trustgraph/messaging/translators/primitives.py +++ b/trustgraph-base/trustgraph/messaging/translators/primitives.py @@ -82,6 +82,7 @@ def _triple_translator_to_pulsar(data: Dict[str, Any]) -> Triple: def _triple_translator_from_pulsar(obj: Triple) -> Dict[str, Any]: + """Convert Triple object to wire format dict.""" term_translator = TermTranslator() result: Dict[str, Any] = {} diff --git a/trustgraph-base/trustgraph/provenance/namespaces.py b/trustgraph-base/trustgraph/provenance/namespaces.py index 9801f34c..b207b38f 100644 --- a/trustgraph-base/trustgraph/provenance/namespaces.py +++ b/trustgraph-base/trustgraph/provenance/namespaces.py @@ -28,6 +28,18 @@ RDF_TYPE = RDF + "type" RDFS = "http://www.w3.org/2000/01/rdf-schema#" RDFS_LABEL = RDFS + "label" +# Schema.org namespace +SCHEMA = "https://schema.org/" +SCHEMA_SUBJECT_OF = SCHEMA + "subjectOf" +SCHEMA_DIGITAL_DOCUMENT = SCHEMA + "DigitalDocument" +SCHEMA_DESCRIPTION = SCHEMA + "description" +SCHEMA_KEYWORDS = SCHEMA + "keywords" +SCHEMA_NAME = SCHEMA + "name" + +# SKOS namespace +SKOS = "http://www.w3.org/2004/02/skos/core#" +SKOS_DEFINITION = SKOS + "definition" + # TrustGraph namespace for custom predicates TG = "https://trustgraph.ai/ns/" TG_REIFIES = TG + "reifies" diff --git a/trustgraph-base/trustgraph/provenance/triples.py b/trustgraph-base/trustgraph/provenance/triples.py index 5e4dc774..cbb0e420 100644 --- a/trustgraph-base/trustgraph/provenance/triples.py +++ b/trustgraph-base/trustgraph/provenance/triples.py @@ -5,7 +5,7 @@ Helper functions to build PROV-O triples for extraction-time provenance. from datetime import datetime from typing import List, Optional -from .. schema import Triple, Term, IRI, LITERAL +from .. schema import Triple, Term, IRI, LITERAL, TRIPLE from . namespaces import ( RDF_TYPE, RDFS_LABEL, @@ -145,6 +145,7 @@ def derived_entity_triples( # Activity declaration _triple(act_uri, RDF_TYPE, _iri(PROV_ACTIVITY)), + _triple(act_uri, RDFS_LABEL, _literal(f"{component_name} extraction")), _triple(act_uri, PROV_USED, _iri(parent_uri)), _triple(act_uri, PROV_WAS_ASSOCIATED_WITH, _iri(agt_uri)), _triple(act_uri, PROV_STARTED_AT_TIME, _literal(timestamp)), @@ -181,9 +182,7 @@ def derived_entity_triples( def triple_provenance_triples( stmt_uri: str, - subject_uri: str, - predicate_uri: str, - object_term: Term, + extracted_triple: Triple, chunk_uri: str, component_name: str, component_version: str, @@ -195,15 +194,13 @@ def triple_provenance_triples( Build provenance triples for an extracted knowledge triple using reification. Creates: - - Statement object that reifies the triple + - Reification triple: stmt_uri tg:reifies <> - wasDerivedFrom link to source chunk - Activity and agent metadata Args: stmt_uri: URI for the reified statement - subject_uri: Subject of the extracted triple - predicate_uri: Predicate of the extracted triple - object_term: Object of the extracted triple (Term) + extracted_triple: The extracted Triple to reify chunk_uri: URI of source chunk component_name: Name of extractor component component_version: Version of the component @@ -212,7 +209,7 @@ def triple_provenance_triples( timestamp: ISO timestamp Returns: - List of Triple objects for the provenance (not the triple itself) + List of Triple objects for the provenance (including reification) """ if timestamp is None: timestamp = datetime.utcnow().isoformat() + "Z" @@ -220,18 +217,24 @@ def triple_provenance_triples( act_uri = activity_uri() agt_uri = agent_uri(component_name) - # Note: The actual reification (tg:reifies pointing at the edge) requires - # RDF 1.2 triple term support. This builds the surrounding provenance. - # The actual reification link must be handled by the knowledge extractor - # using the graph store's reification API. + # Create the quoted triple term (RDF-star reification) + triple_term = Term(type=TRIPLE, triple=extracted_triple) triples = [ + # Reification: stmt_uri tg:reifies <> + Triple( + s=_iri(stmt_uri), + p=_iri(TG_REIFIES), + o=triple_term + ), + # Statement provenance _triple(stmt_uri, PROV_WAS_DERIVED_FROM, _iri(chunk_uri)), _triple(stmt_uri, PROV_WAS_GENERATED_BY, _iri(act_uri)), # Activity _triple(act_uri, RDF_TYPE, _iri(PROV_ACTIVITY)), + _triple(act_uri, RDFS_LABEL, _literal(f"{component_name} extraction")), _triple(act_uri, PROV_USED, _iri(chunk_uri)), _triple(act_uri, PROV_WAS_ASSOCIATED_WITH, _iri(agt_uri)), _triple(act_uri, PROV_STARTED_AT_TIME, _literal(timestamp)), diff --git a/trustgraph-base/trustgraph/provenance/uris.py b/trustgraph-base/trustgraph/provenance/uris.py index ff5570e4..33b00bfd 100644 --- a/trustgraph-base/trustgraph/provenance/uris.py +++ b/trustgraph-base/trustgraph/provenance/uris.py @@ -1,11 +1,12 @@ """ URI generation for provenance entities. -URI patterns: -- Document: https://trustgraph.ai/doc/{doc_id} -- Page: https://trustgraph.ai/page/{doc_id}/p{page_number} -- Chunk: https://trustgraph.ai/chunk/{doc_id}/p{page}/c{chunk} (from page) - https://trustgraph.ai/chunk/{doc_id}/c{chunk} (from text doc) +Document IDs are already IRIs (e.g., https://trustgraph.ai/doc/abc123). +Child entities (pages, chunks) append path segments to the parent IRI: +- Document: {doc_iri} (as provided) +- Page: {doc_iri}/p{page_number} +- Chunk: {page_iri}/c{chunk_index} (from page) + {doc_iri}/c{chunk_index} (from text doc) - Activity: https://trustgraph.ai/activity/{uuid} - Statement: https://trustgraph.ai/stmt/{uuid} """ @@ -13,7 +14,7 @@ URI patterns: import uuid import urllib.parse -# Base URI prefix +# Base URI prefix for generated URIs (activities, statements, agents) TRUSTGRAPH_BASE = "https://trustgraph.ai" @@ -22,24 +23,24 @@ def _encode_id(id_str: str) -> str: return urllib.parse.quote(str(id_str), safe='') -def document_uri(doc_id: str) -> str: - """Generate URI for a source document.""" - return f"{TRUSTGRAPH_BASE}/doc/{_encode_id(doc_id)}" +def document_uri(doc_iri: str) -> str: + """Return the document IRI as-is (already a full URI).""" + return doc_iri -def page_uri(doc_id: str, page_number: int) -> str: - """Generate URI for a page extracted from a document.""" - return f"{TRUSTGRAPH_BASE}/page/{_encode_id(doc_id)}/p{page_number}" +def page_uri(doc_iri: str, page_number: int) -> str: + """Generate URI for a page by appending to document IRI.""" + return f"{doc_iri}/p{page_number}" -def chunk_uri_from_page(doc_id: str, page_number: int, chunk_index: int) -> str: +def chunk_uri_from_page(doc_iri: str, page_number: int, chunk_index: int) -> str: """Generate URI for a chunk extracted from a page.""" - return f"{TRUSTGRAPH_BASE}/chunk/{_encode_id(doc_id)}/p{page_number}/c{chunk_index}" + return f"{doc_iri}/p{page_number}/c{chunk_index}" -def chunk_uri_from_doc(doc_id: str, chunk_index: int) -> str: +def chunk_uri_from_doc(doc_iri: str, chunk_index: int) -> str: """Generate URI for a chunk extracted directly from a text document.""" - return f"{TRUSTGRAPH_BASE}/chunk/{_encode_id(doc_id)}/c{chunk_index}" + return f"{doc_iri}/c{chunk_index}" def activity_uri(activity_id: str = None) -> str: diff --git a/trustgraph-base/trustgraph/provenance/vocabulary.py b/trustgraph-base/trustgraph/provenance/vocabulary.py index 0124e0cd..47164da8 100644 --- a/trustgraph-base/trustgraph/provenance/vocabulary.py +++ b/trustgraph-base/trustgraph/provenance/vocabulary.py @@ -16,6 +16,9 @@ from . namespaces import ( PROV_WAS_DERIVED_FROM, PROV_WAS_GENERATED_BY, PROV_USED, PROV_WAS_ASSOCIATED_WITH, PROV_STARTED_AT_TIME, DC_TITLE, DC_SOURCE, DC_DATE, DC_CREATOR, + SCHEMA_SUBJECT_OF, SCHEMA_DIGITAL_DOCUMENT, SCHEMA_DESCRIPTION, + SCHEMA_KEYWORDS, SCHEMA_NAME, + SKOS_DEFINITION, TG_REIFIES, TG_PAGE_COUNT, TG_MIME_TYPE, TG_PAGE_NUMBER, TG_CHUNK_INDEX, TG_CHAR_OFFSET, TG_CHAR_LENGTH, TG_CHUNK_SIZE, TG_CHUNK_OVERLAP, TG_COMPONENT_VERSION, @@ -57,6 +60,20 @@ DC_PREDICATE_LABELS = [ _label_triple(DC_CREATOR, "creator"), ] +# Schema.org labels +SCHEMA_LABELS = [ + _label_triple(SCHEMA_SUBJECT_OF, "subject of"), + _label_triple(SCHEMA_DIGITAL_DOCUMENT, "Digital Document"), + _label_triple(SCHEMA_DESCRIPTION, "description"), + _label_triple(SCHEMA_KEYWORDS, "keywords"), + _label_triple(SCHEMA_NAME, "name"), +] + +# SKOS labels +SKOS_LABELS = [ + _label_triple(SKOS_DEFINITION, "definition"), +] + # TrustGraph predicate labels TG_PREDICATE_LABELS = [ _label_triple(TG_REIFIES, "reifies"), @@ -97,5 +114,7 @@ def get_vocabulary_triples() -> List[Triple]: PROV_CLASS_LABELS + PROV_PREDICATE_LABELS + DC_PREDICATE_LABELS + + SCHEMA_LABELS + + SKOS_LABELS + TG_PREDICATE_LABELS ) diff --git a/trustgraph-cli/trustgraph/cli/graph_to_turtle.py b/trustgraph-cli/trustgraph/cli/graph_to_turtle.py index 1d34e39f..f42fe140 100644 --- a/trustgraph-cli/trustgraph/cli/graph_to_turtle.py +++ b/trustgraph-cli/trustgraph/cli/graph_to_turtle.py @@ -1,6 +1,6 @@ """ Connects to the graph query service and dumps all graph edges in Turtle -format. +format with RDF-star support for quoted triples. """ import rdflib @@ -10,11 +10,37 @@ import argparse import os from trustgraph.api import Api, Uri +from trustgraph.knowledge import QuotedTriple default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') default_user = 'trustgraph' default_collection = 'default' + +def value_to_rdflib(val): + """Convert a TrustGraph value to an rdflib term.""" + if isinstance(val, Uri): + # Skip malformed URLs with spaces + if " " in val: + return None + return rdflib.term.URIRef(val) + elif isinstance(val, QuotedTriple): + # RDF-star quoted triple + s_term = value_to_rdflib(val.s) + p_term = value_to_rdflib(val.p) + o_term = value_to_rdflib(val.o) + if s_term is None or p_term is None or o_term is None: + return None + # rdflib 6.x+ supports Triple as a term type + try: + return rdflib.term.Triple((s_term, p_term, o_term)) + except AttributeError: + # Fallback for older rdflib versions - represent as string + return rdflib.term.Literal(f"<<{val.s} {val.p} {val.o}>>") + else: + return rdflib.term.Literal(str(val)) + + def show_graph(url, flow_id, user, collection): api = Api(url).flow().id(flow_id) @@ -30,18 +56,10 @@ def show_graph(url, flow_id, user, collection): sv = rdflib.term.URIRef(row.s) pv = rdflib.term.URIRef(row.p) + ov = value_to_rdflib(row.o) - if isinstance(row.o, Uri): - - # Skip malformed URLs with spaces in - if " " in row.o: - continue - - ov = rdflib.term.URIRef(row.o) - - else: - - ov = rdflib.term.Literal(row.o) + if ov is None: + continue g.add((sv, pv, ov)) diff --git a/trustgraph-flow/trustgraph/direct/milvus_graph_embeddings.py b/trustgraph-flow/trustgraph/direct/milvus_graph_embeddings.py index 4a106f27..dcbf6734 100644 --- a/trustgraph-flow/trustgraph/direct/milvus_graph_embeddings.py +++ b/trustgraph-flow/trustgraph/direct/milvus_graph_embeddings.py @@ -90,8 +90,14 @@ class EntityVectors: max_length=65535, ) + chunk_id_field = FieldSchema( + name="chunk_id", + dtype=DataType.VARCHAR, + max_length=65535, + ) + schema = CollectionSchema( - fields = [pkey_field, vec_field, entity_field], + fields = [pkey_field, vec_field, entity_field, chunk_id_field], description = "Graph embedding schema", ) @@ -119,17 +125,18 @@ class EntityVectors: self.collections[(dimension, user, collection)] = collection_name logger.info(f"Created Milvus collection {collection_name} with dimension {dimension}") - def insert(self, embeds, entity, user, collection): + def insert(self, embeds, entity, user, collection, chunk_id=""): dim = len(embeds) if (dim, user, collection) not in self.collections: self.init_collection(dim, user, collection) - + data = [ { "vector": embeds, "entity": entity, + "chunk_id": chunk_id, } ] diff --git a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py index 578f519d..b91d30ed 100755 --- a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py @@ -18,7 +18,10 @@ from .... schema import PromptRequest, PromptResponse from .... rdf import TRUSTGRAPH_ENTITIES, DEFINITION, RDF_LABEL, SUBJECT_OF from .... base import FlowProcessor, ConsumerSpec, ProducerSpec -from .... base import PromptClientSpec +from .... base import PromptClientSpec, ParameterSpec + +from .... provenance import statement_uri, triple_provenance_triples +from .... flow_version import __version__ as COMPONENT_VERSION DEFINITION_VALUE = Term(type=IRI, iri=DEFINITION) RDF_LABEL_VALUE = Term(type=IRI, iri=RDF_LABEL) @@ -75,6 +78,10 @@ class Processor(FlowProcessor): ) ) + # Optional flow parameters for provenance + self.register_specification(ParameterSpec("llm-model")) + self.register_specification(ParameterSpec("ontology")) + def to_uri(self, text): part = text.replace(" ", "-").lower().encode("utf-8") @@ -132,6 +139,10 @@ class Processor(FlowProcessor): chunk_doc_id = v.document_id if v.document_id else v.metadata.id chunk_uri = v.metadata.id # The URI form for the chunk + # Get optional provenance parameters + llm_model = flow("llm-model") + ontology_uri = flow("ontology") + # Note: Document metadata is now emitted once by librarian at processing # initiation, so we don't need to duplicate it here. @@ -157,9 +168,24 @@ class Processor(FlowProcessor): o=Term(type=LITERAL, value=s), )) - triples.append(Triple( + # The definition triple - this is the main extracted fact + definition_triple = Triple( s=s_value, p=DEFINITION_VALUE, o=o_value - )) + ) + triples.append(definition_triple) + + # Generate provenance for the definition triple (reification) + stmt_uri = statement_uri() + prov_triples = triple_provenance_triples( + stmt_uri=stmt_uri, + extracted_triple=definition_triple, + chunk_uri=chunk_uri, + component_name=default_ident, + component_version=COMPONENT_VERSION, + llm_model=llm_model, + ontology_uri=ontology_uri, + ) + triples.extend(prov_triples) # Link entity to chunk (not top-level document) triples.append(Triple( diff --git a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py index 1a719500..7bda91eb 100755 --- a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py @@ -18,7 +18,10 @@ from .... schema import PromptRequest, PromptResponse from .... rdf import RDF_LABEL, TRUSTGRAPH_ENTITIES, SUBJECT_OF from .... base import FlowProcessor, ConsumerSpec, ProducerSpec -from .... base import PromptClientSpec +from .... base import PromptClientSpec, ParameterSpec + +from .... provenance import statement_uri, triple_provenance_triples +from .... flow_version import __version__ as COMPONENT_VERSION RDF_LABEL_VALUE = Term(type=IRI, iri=RDF_LABEL) SUBJECT_OF_VALUE = Term(type=IRI, iri=SUBJECT_OF) @@ -65,6 +68,10 @@ class Processor(FlowProcessor): ) ) + # Optional flow parameters for provenance + self.register_specification(ParameterSpec("llm-model")) + self.register_specification(ParameterSpec("ontology")) + def to_uri(self, text): part = text.replace(" ", "-").lower().encode("utf-8") @@ -113,6 +120,10 @@ class Processor(FlowProcessor): chunk_doc_id = v.document_id if v.document_id else v.metadata.id chunk_uri = v.metadata.id # The URI form for the chunk + # Get optional provenance parameters + llm_model = flow("llm-model") + ontology_uri = flow("ontology") + # Note: Document metadata is now emitted once by librarian at processing # initiation, so we don't need to duplicate it here. @@ -142,11 +153,26 @@ class Processor(FlowProcessor): else: o_value = Term(type=LITERAL, value=str(o)) - triples.append(Triple( + # The relationship triple - this is the main extracted fact + relationship_triple = Triple( s=s_value, p=p_value, o=o_value - )) + ) + triples.append(relationship_triple) + + # Generate provenance for the relationship triple (reification) + stmt_uri = statement_uri() + prov_triples = triple_provenance_triples( + stmt_uri=stmt_uri, + extracted_triple=relationship_triple, + chunk_uri=chunk_uri, + component_name=default_ident, + component_version=COMPONENT_VERSION, + llm_model=llm_model, + ontology_uri=ontology_uri, + ) + triples.extend(prov_triples) # Label for s triples.append(Triple( diff --git a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py index eac33dde..4d4290b1 100755 --- a/trustgraph-flow/trustgraph/query/triples/cassandra/service.py +++ b/trustgraph-flow/trustgraph/query/triples/cassandra/service.py @@ -6,11 +6,13 @@ null. Output is a list of quads. import logging +import json + from .... direct.cassandra_kg import ( EntityCentricKnowledgeGraph, GRAPH_WILDCARD, DEFAULT_GRAPH ) from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error -from .... schema import Term, Triple, IRI, LITERAL +from .... schema import Term, Triple, IRI, LITERAL, TRIPLE from .... base import TriplesQueryService from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config @@ -33,6 +35,36 @@ def get_term_value(term): return term.id or term.value +def deserialize_term(term_dict): + """Deserialize a term from JSON structure.""" + if term_dict is None: + return None + term_type = term_dict.get("type", "") + if term_type == IRI: + return Term(type=IRI, iri=term_dict.get("iri", "")) + elif term_type == LITERAL: + return Term( + type=LITERAL, + value=term_dict.get("value", ""), + datatype=term_dict.get("datatype", ""), + language=term_dict.get("language", "") + ) + elif term_type == TRIPLE: + # Recursive for nested triples + nested = term_dict.get("triple") + if nested: + return Term( + type=TRIPLE, + triple=Triple( + s=deserialize_term(nested.get("s")), + p=deserialize_term(nested.get("p")), + o=deserialize_term(nested.get("o")), + ) + ) + # Fallback + return Term(type=LITERAL, value=str(term_dict)) + + def create_term(value, otype=None, dtype=None, lang=None): """ Create a Term from a string value, optionally using type metadata. @@ -57,8 +89,22 @@ def create_term(value, otype=None, dtype=None, lang=None): language=lang or "" ) elif otype == 't': - # Triple/reification - treat as IRI for now - return Term(type=IRI, iri=value) + # Triple/reification - parse JSON and create nested Triple + try: + triple_data = json.loads(value) if isinstance(value, str) else value + if isinstance(triple_data, dict): + return Term( + type=TRIPLE, + triple=Triple( + s=deserialize_term(triple_data.get("s")), + p=deserialize_term(triple_data.get("p")), + o=deserialize_term(triple_data.get("o")), + ) + ) + except (json.JSONDecodeError, TypeError) as e: + logger.warning(f"Failed to parse triple JSON: {e}") + # Fallback if parsing fails + return Term(type=LITERAL, value=str(value)) else: # Unknown otype, fall back to heuristic pass diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py index 21aa21e6..8e1c4485 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/milvus/write.py @@ -57,7 +57,8 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService): self.vecstore.insert( vec, entity_value, message.metadata.user, - message.metadata.collection + message.metadata.collection, + chunk_id=entity.chunk_id or "", ) @staticmethod diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py index c4b0065b..f4de7f82 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/pinecone/write.py @@ -137,11 +137,15 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService): # Generate unique ID for each vector vector_id = str(uuid.uuid4()) + metadata = {"entity": entity_value} + if entity.chunk_id: + metadata["chunk_id"] = entity.chunk_id + records = [ { "id": vector_id, "values": vec, - "metadata": { "entity": entity_value }, + "metadata": metadata, } ] diff --git a/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py b/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py index 0da59bb9..4877ae96 100755 --- a/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py +++ b/trustgraph-flow/trustgraph/storage/graph_embeddings/qdrant/write.py @@ -90,15 +90,19 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService): ) ) + payload = { + "entity": entity_value, + } + if entity.chunk_id: + payload["chunk_id"] = entity.chunk_id + self.qdrant.upsert( collection_name=collection, points=[ PointStruct( id=str(uuid.uuid4()), vector=vec, - payload={ - "entity": entity_value, - } + payload=payload, ) ] ) diff --git a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py index 5bc842de..ab13ccbc 100755 --- a/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/cassandra/write.py @@ -9,6 +9,7 @@ import os import argparse import time import logging +import json from .... direct.cassandra_kg import ( EntityCentricKnowledgeGraph, DEFAULT_GRAPH @@ -25,6 +26,37 @@ logger = logging.getLogger(__name__) default_ident = "triples-write" +def serialize_triple(triple): + """Serialize a Triple object to JSON for storage.""" + if triple is None: + return None + + def term_to_dict(term): + if term is None: + return None + + result = {"type": term.type} + if term.type == IRI: + result["iri"] = term.iri + elif term.type == LITERAL: + result["value"] = term.value + if term.datatype: + result["datatype"] = term.datatype + if term.language: + result["language"] = term.language + elif term.type == BLANK: + result["id"] = term.id + elif term.type == TRIPLE: + result["triple"] = serialize_triple(term.triple) + return result + + return json.dumps({ + "s": term_to_dict(triple.s), + "p": term_to_dict(triple.p), + "o": term_to_dict(triple.o), + }) + + def get_term_value(term): """Extract the string value from a Term""" if term is None: @@ -33,6 +65,9 @@ def get_term_value(term): return term.iri elif term.type == LITERAL: return term.value + elif term.type == TRIPLE: + # Serialize nested triple as JSON + return serialize_triple(term.triple) else: # For blank nodes or other types, use id or value return term.id or term.value