diff --git a/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py b/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py index 816209c3..4ef70f78 100644 --- a/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py @@ -9,8 +9,9 @@ import asyncio from typing import List, Dict, Any, Optional from .... schema import Chunk, Triple, Triples, Metadata, Value +from .... schema import EntityContext, EntityContexts from .... schema import PromptRequest, PromptResponse -from .... rdf import TRUSTGRAPH_ENTITIES, RDF_TYPE, RDF_LABEL +from .... rdf import TRUSTGRAPH_ENTITIES, RDF_TYPE, RDF_LABEL, DEFINITION from .... base import FlowProcessor, ConsumerSpec, ProducerSpec from .... base import PromptClientSpec, EmbeddingsClientSpec @@ -25,6 +26,16 @@ logger = logging.getLogger(__name__) default_ident = "kg-extract-ontology" default_concurrency = 1 +# URI prefix mappings for common namespaces +URI_PREFIXES = { + "rdf:": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", + "rdfs:": "http://www.w3.org/2000/01/rdf-schema#", + "owl:": "http://www.w3.org/2002/07/owl#", + "skos:": "http://www.w3.org/2004/02/skos/core#", + "schema:": "https://schema.org/", + "xsd:": "http://www.w3.org/2001/XMLSchema#", +} + class Processor(FlowProcessor): """Main OntoRAG extraction processor.""" @@ -71,6 +82,13 @@ class Processor(FlowProcessor): ) ) + self.register_specification( + ProducerSpec( + name="entity-contexts", + schema=EntityContexts + ) + ) + # Register config handler for ontology updates self.register_config_handler(self.on_ontology_config) @@ -254,12 +272,17 @@ class Processor(FlowProcessor): if not ontology_subsets: logger.warning("No relevant ontology elements found for chunk") - # Emit empty triples + # Emit empty outputs await self.emit_triples( flow("triples"), v.metadata, [] ) + await self.emit_entity_contexts( + flow("entity-contexts"), + v.metadata, + [] + ) return # Merge subsets if multiple ontologies matched @@ -299,6 +322,9 @@ class Processor(FlowProcessor): for t in v.metadata.metadata: triples.append(t) + # Build entity contexts from triples + entity_contexts = self.build_entity_contexts(triples) + # Emit triples await self.emit_triples( flow("triples"), @@ -306,16 +332,28 @@ class Processor(FlowProcessor): triples ) - logger.info(f"Extracted {len(triples)} ontology-conformant triples") + # Emit entity contexts + await self.emit_entity_contexts( + flow("entity-contexts"), + v.metadata, + entity_contexts + ) + + logger.info(f"Extracted {len(triples)} ontology-conformant triples and {len(entity_contexts)} entity contexts") except Exception as e: logger.error(f"OntoRAG extraction exception: {e}", exc_info=True) - # Emit empty triples on error + # Emit empty outputs on error await self.emit_triples( flow("triples"), v.metadata, [] ) + await self.emit_entity_contexts( + flow("entity-contexts"), + v.metadata, + [] + ) def build_extraction_variables(self, chunk: str, ontology_subset: OntologySubset) -> Dict[str, Any]: """Build variables for ontology-based extraction prompt template. @@ -338,6 +376,7 @@ class Processor(FlowProcessor): ontology_subset: OntologySubset) -> List[Triple]: """Parse and validate extracted triples against ontology.""" validated_triples = [] + ontology_id = ontology_subset.ontology_id for triple_data in triples_response: try: @@ -351,10 +390,22 @@ class Processor(FlowProcessor): # Validate against ontology if self.is_valid_triple(subject, predicate, object_val, ontology_subset): - # Create Triple object - s_value = Value(value=subject, is_uri=self.is_uri(subject)) - p_value = Value(value=predicate, is_uri=True) - o_value = Value(value=object_val, is_uri=self.is_uri(object_val)) + # Expand URIs before creating Value objects + subject_uri = self.expand_uri(subject, ontology_subset, ontology_id) + predicate_uri = self.expand_uri(predicate, ontology_subset, ontology_id) + + # Object might be URI or literal - check before expanding + if self.is_uri(object_val) or self.should_expand_as_uri(object_val, ontology_subset): + object_uri = self.expand_uri(object_val, ontology_subset, ontology_id) + is_object_uri = True + else: + object_uri = object_val + is_object_uri = False + + # Create Triple object with expanded URIs + s_value = Value(value=subject_uri, is_uri=True) + p_value = Value(value=predicate_uri, is_uri=True) + o_value = Value(value=object_uri, is_uri=is_object_uri) validated_triples.append(Triple( s=s_value, @@ -369,6 +420,27 @@ class Processor(FlowProcessor): return validated_triples + def should_expand_as_uri(self, value: str, ontology_subset: OntologySubset) -> bool: + """Check if a value should be treated as URI (not literal). + + Returns True if value is a class name, property name, or entity reference. + """ + # Check if it's a class or property from ontology + if value in ontology_subset.classes: + return True + if value in ontology_subset.object_properties: + return True + if value in ontology_subset.datatype_properties: + return True + # Check if it starts with a known prefix + for prefix in URI_PREFIXES.keys(): + if value.startswith(prefix): + return True + # Check if it looks like an entity reference (e.g., "recipe:cornish-pasty") + if ":" in value and not value.startswith("http"): + return True + return False + def is_valid_triple(self, subject: str, predicate: str, object_val: str, ontology_subset: OntologySubset) -> bool: """Validate triple against ontology constraints.""" @@ -391,11 +463,55 @@ class Processor(FlowProcessor): # TODO: Add more sophisticated validation (domain/range checking) return True + def expand_uri(self, value: str, ontology_subset: OntologySubset, ontology_id: str = "unknown") -> str: + """Expand prefix notation or short names to full URIs. + + Args: + value: Value to expand (e.g., "rdf:type", "Recipe", "has_ingredient") + ontology_subset: Ontology subset for class/property lookup + ontology_id: ID of the ontology for constructing instance URIs + + Returns: + Full URI string + """ + # Already a full URI + if value.startswith("http://") or value.startswith("https://"): + return value + + # Check standard prefixes (rdf:, rdfs:, etc.) + for prefix, namespace in URI_PREFIXES.items(): + if value.startswith(prefix): + return namespace + value[len(prefix):] + + # Check if it's an ontology class + if value in ontology_subset.classes: + class_def = ontology_subset.classes[value] + if hasattr(class_def, 'uri') and class_def.uri: + return class_def.uri + # Fallback: construct URI + return f"https://trustgraph.ai/ontology/{ontology_id}#{value}" + + # Check if it's an ontology property + if value in ontology_subset.object_properties: + prop_def = ontology_subset.object_properties[value] + if hasattr(prop_def, 'uri') and prop_def.uri: + return prop_def.uri + return f"https://trustgraph.ai/ontology/{ontology_id}#{value}" + + if value in ontology_subset.datatype_properties: + prop_def = ontology_subset.datatype_properties[value] + if hasattr(prop_def, 'uri') and prop_def.uri: + return prop_def.uri + return f"https://trustgraph.ai/ontology/{ontology_id}#{value}" + + # Otherwise, treat as entity instance - construct unique URI + # Normalize the value for URI (lowercase, replace spaces with hyphens) + normalized = value.replace(" ", "-").lower() + return f"https://trustgraph.ai/{ontology_id}/{normalized}" + def is_uri(self, value: str) -> bool: - """Check if value is a URI.""" - return value.startswith("http://") or value.startswith("https://") or \ - value.startswith(str(TRUSTGRAPH_ENTITIES)) or \ - value in ["rdf:type", "rdfs:label"] + """Check if value is already a full URI.""" + return value.startswith("http://") or value.startswith("https://") async def emit_triples(self, pub, metadata: Metadata, triples: List[Triple]): """Emit triples to output.""" @@ -410,6 +526,76 @@ class Processor(FlowProcessor): ) await pub.send(t) + async def emit_entity_contexts(self, pub, metadata: Metadata, entities: List[EntityContext]): + """Emit entity contexts to output.""" + ec = EntityContexts( + metadata=Metadata( + id=metadata.id, + metadata=[], + user=metadata.user, + collection=metadata.collection, + ), + entities=entities, + ) + await pub.send(ec) + + def build_entity_contexts(self, triples: List[Triple]) -> List[EntityContext]: + """Build entity contexts from extracted triples. + + Collects rdfs:label and definition properties for each entity to create + contextual descriptions for embedding. + + Args: + triples: List of extracted triples + + Returns: + List of EntityContext objects + """ + # Group triples by subject to collect entity information + entity_data = {} # subject_uri -> {labels: [], definitions: []} + + for triple in triples: + subject_uri = triple.s.value + predicate_uri = triple.p.value + object_val = triple.o.value + + # Initialize entity data if not exists + if subject_uri not in entity_data: + entity_data[subject_uri] = {'labels': [], 'definitions': []} + + # Collect labels (rdfs:label) + if predicate_uri == RDF_LABEL: + if not triple.o.is_uri: # Labels are literals + entity_data[subject_uri]['labels'].append(object_val) + + # Collect definitions (skos:definition, schema:description) + elif predicate_uri == DEFINITION or predicate_uri == "https://schema.org/description": + if not triple.o.is_uri: + entity_data[subject_uri]['definitions'].append(object_val) + + # Build EntityContext objects + entity_contexts = [] + for subject_uri, data in entity_data.items(): + # Build context text from labels and definitions + context_parts = [] + + if data['labels']: + context_parts.append(f"Label: {data['labels'][0]}") + + if data['definitions']: + context_parts.extend(data['definitions']) + + # Only create EntityContext if we have meaningful context + if context_parts: + context_text = ". ".join(context_parts) + entity_contexts.append(EntityContext( + entity=Value(value=subject_uri, is_uri=True), + context=context_text + )) + + logger.debug(f"Built {len(entity_contexts)} entity contexts from {len(triples)} triples") + return entity_contexts + @staticmethod def add_args(parser): """Add command-line arguments."""