Refactored URI stuff

This commit is contained in:
Cyber MacGeddon 2025-11-12 17:40:57 +00:00
parent bf067e1d2d
commit 828540aebc

View file

@ -9,8 +9,9 @@ import asyncio
from typing import List, Dict, Any, Optional
from .... schema import Chunk, Triple, Triples, Metadata, Value
from .... schema import EntityContext, EntityContexts
from .... schema import PromptRequest, PromptResponse
from .... rdf import TRUSTGRAPH_ENTITIES, RDF_TYPE, RDF_LABEL
from .... rdf import TRUSTGRAPH_ENTITIES, RDF_TYPE, RDF_LABEL, DEFINITION
from .... base import FlowProcessor, ConsumerSpec, ProducerSpec
from .... base import PromptClientSpec, EmbeddingsClientSpec
@ -25,6 +26,16 @@ logger = logging.getLogger(__name__)
default_ident = "kg-extract-ontology"
default_concurrency = 1
# URI prefix mappings for common namespaces
URI_PREFIXES = {
"rdf:": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs:": "http://www.w3.org/2000/01/rdf-schema#",
"owl:": "http://www.w3.org/2002/07/owl#",
"skos:": "http://www.w3.org/2004/02/skos/core#",
"schema:": "https://schema.org/",
"xsd:": "http://www.w3.org/2001/XMLSchema#",
}
class Processor(FlowProcessor):
"""Main OntoRAG extraction processor."""
@ -71,6 +82,13 @@ class Processor(FlowProcessor):
)
)
self.register_specification(
ProducerSpec(
name="entity-contexts",
schema=EntityContexts
)
)
# Register config handler for ontology updates
self.register_config_handler(self.on_ontology_config)
@ -254,12 +272,17 @@ class Processor(FlowProcessor):
if not ontology_subsets:
logger.warning("No relevant ontology elements found for chunk")
# Emit empty triples
# Emit empty outputs
await self.emit_triples(
flow("triples"),
v.metadata,
[]
)
await self.emit_entity_contexts(
flow("entity-contexts"),
v.metadata,
[]
)
return
# Merge subsets if multiple ontologies matched
@ -299,6 +322,9 @@ class Processor(FlowProcessor):
for t in v.metadata.metadata:
triples.append(t)
# Build entity contexts from triples
entity_contexts = self.build_entity_contexts(triples)
# Emit triples
await self.emit_triples(
flow("triples"),
@ -306,16 +332,28 @@ class Processor(FlowProcessor):
triples
)
logger.info(f"Extracted {len(triples)} ontology-conformant triples")
# Emit entity contexts
await self.emit_entity_contexts(
flow("entity-contexts"),
v.metadata,
entity_contexts
)
logger.info(f"Extracted {len(triples)} ontology-conformant triples and {len(entity_contexts)} entity contexts")
except Exception as e:
logger.error(f"OntoRAG extraction exception: {e}", exc_info=True)
# Emit empty triples on error
# Emit empty outputs on error
await self.emit_triples(
flow("triples"),
v.metadata,
[]
)
await self.emit_entity_contexts(
flow("entity-contexts"),
v.metadata,
[]
)
def build_extraction_variables(self, chunk: str, ontology_subset: OntologySubset) -> Dict[str, Any]:
"""Build variables for ontology-based extraction prompt template.
@ -338,6 +376,7 @@ class Processor(FlowProcessor):
ontology_subset: OntologySubset) -> List[Triple]:
"""Parse and validate extracted triples against ontology."""
validated_triples = []
ontology_id = ontology_subset.ontology_id
for triple_data in triples_response:
try:
@ -351,10 +390,22 @@ class Processor(FlowProcessor):
# Validate against ontology
if self.is_valid_triple(subject, predicate, object_val, ontology_subset):
# Create Triple object
s_value = Value(value=subject, is_uri=self.is_uri(subject))
p_value = Value(value=predicate, is_uri=True)
o_value = Value(value=object_val, is_uri=self.is_uri(object_val))
# Expand URIs before creating Value objects
subject_uri = self.expand_uri(subject, ontology_subset, ontology_id)
predicate_uri = self.expand_uri(predicate, ontology_subset, ontology_id)
# Object might be URI or literal - check before expanding
if self.is_uri(object_val) or self.should_expand_as_uri(object_val, ontology_subset):
object_uri = self.expand_uri(object_val, ontology_subset, ontology_id)
is_object_uri = True
else:
object_uri = object_val
is_object_uri = False
# Create Triple object with expanded URIs
s_value = Value(value=subject_uri, is_uri=True)
p_value = Value(value=predicate_uri, is_uri=True)
o_value = Value(value=object_uri, is_uri=is_object_uri)
validated_triples.append(Triple(
s=s_value,
@ -369,6 +420,27 @@ class Processor(FlowProcessor):
return validated_triples
def should_expand_as_uri(self, value: str, ontology_subset: OntologySubset) -> bool:
"""Check if a value should be treated as URI (not literal).
Returns True if value is a class name, property name, or entity reference.
"""
# Check if it's a class or property from ontology
if value in ontology_subset.classes:
return True
if value in ontology_subset.object_properties:
return True
if value in ontology_subset.datatype_properties:
return True
# Check if it starts with a known prefix
for prefix in URI_PREFIXES.keys():
if value.startswith(prefix):
return True
# Check if it looks like an entity reference (e.g., "recipe:cornish-pasty")
if ":" in value and not value.startswith("http"):
return True
return False
def is_valid_triple(self, subject: str, predicate: str, object_val: str,
ontology_subset: OntologySubset) -> bool:
"""Validate triple against ontology constraints."""
@ -391,11 +463,55 @@ class Processor(FlowProcessor):
# TODO: Add more sophisticated validation (domain/range checking)
return True
def expand_uri(self, value: str, ontology_subset: OntologySubset, ontology_id: str = "unknown") -> str:
"""Expand prefix notation or short names to full URIs.
Args:
value: Value to expand (e.g., "rdf:type", "Recipe", "has_ingredient")
ontology_subset: Ontology subset for class/property lookup
ontology_id: ID of the ontology for constructing instance URIs
Returns:
Full URI string
"""
# Already a full URI
if value.startswith("http://") or value.startswith("https://"):
return value
# Check standard prefixes (rdf:, rdfs:, etc.)
for prefix, namespace in URI_PREFIXES.items():
if value.startswith(prefix):
return namespace + value[len(prefix):]
# Check if it's an ontology class
if value in ontology_subset.classes:
class_def = ontology_subset.classes[value]
if hasattr(class_def, 'uri') and class_def.uri:
return class_def.uri
# Fallback: construct URI
return f"https://trustgraph.ai/ontology/{ontology_id}#{value}"
# Check if it's an ontology property
if value in ontology_subset.object_properties:
prop_def = ontology_subset.object_properties[value]
if hasattr(prop_def, 'uri') and prop_def.uri:
return prop_def.uri
return f"https://trustgraph.ai/ontology/{ontology_id}#{value}"
if value in ontology_subset.datatype_properties:
prop_def = ontology_subset.datatype_properties[value]
if hasattr(prop_def, 'uri') and prop_def.uri:
return prop_def.uri
return f"https://trustgraph.ai/ontology/{ontology_id}#{value}"
# Otherwise, treat as entity instance - construct unique URI
# Normalize the value for URI (lowercase, replace spaces with hyphens)
normalized = value.replace(" ", "-").lower()
return f"https://trustgraph.ai/{ontology_id}/{normalized}"
def is_uri(self, value: str) -> bool:
"""Check if value is a URI."""
return value.startswith("http://") or value.startswith("https://") or \
value.startswith(str(TRUSTGRAPH_ENTITIES)) or \
value in ["rdf:type", "rdfs:label"]
"""Check if value is already a full URI."""
return value.startswith("http://") or value.startswith("https://")
async def emit_triples(self, pub, metadata: Metadata, triples: List[Triple]):
"""Emit triples to output."""
@ -410,6 +526,76 @@ class Processor(FlowProcessor):
)
await pub.send(t)
async def emit_entity_contexts(self, pub, metadata: Metadata, entities: List[EntityContext]):
"""Emit entity contexts to output."""
ec = EntityContexts(
metadata=Metadata(
id=metadata.id,
metadata=[],
user=metadata.user,
collection=metadata.collection,
),
entities=entities,
)
await pub.send(ec)
def build_entity_contexts(self, triples: List[Triple]) -> List[EntityContext]:
"""Build entity contexts from extracted triples.
Collects rdfs:label and definition properties for each entity to create
contextual descriptions for embedding.
Args:
triples: List of extracted triples
Returns:
List of EntityContext objects
"""
# Group triples by subject to collect entity information
entity_data = {} # subject_uri -> {labels: [], definitions: []}
for triple in triples:
subject_uri = triple.s.value
predicate_uri = triple.p.value
object_val = triple.o.value
# Initialize entity data if not exists
if subject_uri not in entity_data:
entity_data[subject_uri] = {'labels': [], 'definitions': []}
# Collect labels (rdfs:label)
if predicate_uri == RDF_LABEL:
if not triple.o.is_uri: # Labels are literals
entity_data[subject_uri]['labels'].append(object_val)
# Collect definitions (skos:definition, schema:description)
elif predicate_uri == DEFINITION or predicate_uri == "https://schema.org/description":
if not triple.o.is_uri:
entity_data[subject_uri]['definitions'].append(object_val)
# Build EntityContext objects
entity_contexts = []
for subject_uri, data in entity_data.items():
# Build context text from labels and definitions
context_parts = []
if data['labels']:
context_parts.append(f"Label: {data['labels'][0]}")
if data['definitions']:
context_parts.extend(data['definitions'])
# Only create EntityContext if we have meaningful context
if context_parts:
context_text = ". ".join(context_parts)
entity_contexts.append(EntityContext(
entity=Value(value=subject_uri, is_uri=True),
context=context_text
))
logger.debug(f"Built {len(entity_contexts)} entity contexts from {len(triples)} triples")
return entity_contexts
@staticmethod
def add_args(parser):
"""Add command-line arguments."""