Feature/improve ontology extract (#576)

* Tech spec to change ontology extraction

* Ontology extract refactoring
This commit is contained in:
cybermaggedon 2025-12-03 13:36:10 +00:00 committed by GitHub
parent 517434c075
commit b957004db9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 1496 additions and 19 deletions

View file

@ -0,0 +1,164 @@
"""
Entity URI normalization for ontology-based knowledge extraction.
Converts entity names and types into consistent, collision-free URIs.
"""
import re
from typing import Tuple
def normalize_entity_name(entity_name: str) -> str:
"""Normalize entity name to URI-safe identifier.
Args:
entity_name: Natural language entity name (e.g., "Cornish pasty")
Returns:
Normalized identifier (e.g., "cornish-pasty")
"""
# Convert to lowercase
normalized = entity_name.lower()
# Replace spaces and underscores with hyphens
normalized = re.sub(r'[\s_]+', '-', normalized)
# Remove any characters that aren't alphanumeric, hyphens, or periods
normalized = re.sub(r'[^a-z0-9\-.]', '', normalized)
# Remove leading/trailing hyphens
normalized = normalized.strip('-')
# Collapse multiple hyphens
normalized = re.sub(r'-+', '-', normalized)
return normalized
def normalize_type_identifier(type_id: str) -> str:
"""Normalize ontology type identifier to URI-safe format.
Handles prefixed types like "fo/Recipe" by converting to "fo-recipe".
Args:
type_id: Ontology type identifier (e.g., "fo/Recipe", "Food")
Returns:
Normalized type identifier (e.g., "fo-recipe", "food")
"""
# Convert to lowercase
normalized = type_id.lower()
# Replace slashes, colons, and spaces with hyphens
normalized = re.sub(r'[/:.\s_]+', '-', normalized)
# Remove any remaining non-alphanumeric characters except hyphens
normalized = re.sub(r'[^a-z0-9\-]', '', normalized)
# Remove leading/trailing hyphens
normalized = normalized.strip('-')
# Collapse multiple hyphens
normalized = re.sub(r'-+', '-', normalized)
return normalized
def build_entity_uri(entity_name: str, entity_type: str, ontology_id: str,
base_uri: str = "https://trustgraph.ai") -> str:
"""Build a unique URI for an entity based on its name and type.
The type is included in the URI to prevent collisions when the same
name refers to different entity types (e.g., "Cornish pasty" as both
Recipe and Food).
Args:
entity_name: Natural language entity name (e.g., "Cornish pasty")
entity_type: Ontology type (e.g., "fo/Recipe")
ontology_id: Ontology identifier (e.g., "food")
base_uri: Base URI for entity URIs (default: "https://trustgraph.ai")
Returns:
Full entity URI (e.g., "https://trustgraph.ai/food/fo-recipe-cornish-pasty")
Examples:
>>> build_entity_uri("Cornish pasty", "fo/Recipe", "food")
'https://trustgraph.ai/food/fo-recipe-cornish-pasty'
>>> build_entity_uri("Cornish pasty", "fo/Food", "food")
'https://trustgraph.ai/food/fo-food-cornish-pasty'
>>> build_entity_uri("beef", "fo/Food", "food")
'https://trustgraph.ai/food/fo-food-beef'
"""
type_part = normalize_type_identifier(entity_type)
name_part = normalize_entity_name(entity_name)
# Combine type and name to ensure uniqueness
entity_id = f"{type_part}-{name_part}"
# Build full URI
return f"{base_uri}/{ontology_id}/{entity_id}"
class EntityRegistry:
"""Registry to track entity name/type tuples and their assigned URIs.
Ensures that the same (entity_name, entity_type) tuple always maps
to the same URI, enabling deduplication across the extraction process.
"""
def __init__(self, ontology_id: str, base_uri: str = "https://trustgraph.ai"):
"""Initialize the entity registry.
Args:
ontology_id: Ontology identifier (e.g., "food")
base_uri: Base URI for entity URIs
"""
self.ontology_id = ontology_id
self.base_uri = base_uri
self._registry = {} # (entity_name, entity_type) -> uri
def get_or_create_uri(self, entity_name: str, entity_type: str) -> str:
"""Get existing URI or create new one for entity.
Args:
entity_name: Natural language entity name
entity_type: Ontology type identifier
Returns:
URI for this entity (same URI for same name/type tuple)
"""
key = (entity_name, entity_type)
if key not in self._registry:
uri = build_entity_uri(
entity_name,
entity_type,
self.ontology_id,
self.base_uri
)
self._registry[key] = uri
return self._registry[key]
def lookup(self, entity_name: str, entity_type: str) -> str:
"""Look up URI for entity (returns None if not registered).
Args:
entity_name: Natural language entity name
entity_type: Ontology type identifier
Returns:
URI for this entity, or None if not found
"""
key = (entity_name, entity_type)
return self._registry.get(key)
def clear(self):
"""Clear all registered entities."""
self._registry.clear()
def size(self) -> int:
"""Get number of registered entities."""
return len(self._registry)

View file

@ -20,6 +20,8 @@ from .ontology_embedder import OntologyEmbedder
from .vector_store import InMemoryVectorStore
from .text_processor import TextProcessor
from .ontology_selector import OntologySelector, OntologySubset
from .simplified_parser import parse_extraction_response
from .triple_converter import TripleConverter
logger = logging.getLogger(__name__)
@ -298,25 +300,10 @@ class Processor(FlowProcessor):
# Build extraction prompt variables
prompt_variables = self.build_extraction_variables(chunk, ontology_subset)
# Call prompt service for extraction
try:
# Use prompt() method with extract-with-ontologies prompt ID
triples_response = await flow("prompt-request").prompt(
id="extract-with-ontologies",
variables=prompt_variables
)
logger.debug(f"Extraction response: {triples_response}")
if not isinstance(triples_response, list):
logger.error("Expected list of triples from prompt service")
triples_response = []
except Exception as e:
logger.error(f"Prompt service error: {e}", exc_info=True)
triples_response = []
# Parse and validate triples
triples = self.parse_and_validate_triples(triples_response, ontology_subset)
# Extract using simplified entity-relationship-attribute format
triples = await self.extract_with_simplified_format(
flow, chunk, ontology_subset, prompt_variables
)
# Add metadata triples
for t in v.metadata.metadata:
@ -362,6 +349,55 @@ class Processor(FlowProcessor):
[]
)
async def extract_with_simplified_format(
self,
flow,
chunk: str,
ontology_subset: OntologySubset,
prompt_variables: Dict[str, Any]
) -> List[Triple]:
"""Extract triples using simplified entity-relationship-attribute format.
Args:
flow: Flow object for accessing services
chunk: Text chunk to extract from
ontology_subset: Selected ontology subset
prompt_variables: Variables for prompt template
Returns:
List of Triple objects
"""
try:
# Call prompt service with simplified format prompt
extraction_response = await flow("prompt-request").prompt(
id="extract-with-ontologies",
variables=prompt_variables
)
logger.debug(f"Simplified extraction response: {extraction_response}")
# Parse response into structured format
extraction_result = parse_extraction_response(extraction_response)
if not extraction_result:
logger.warning("Failed to parse extraction response")
return []
logger.info(f"Parsed {len(extraction_result.entities)} entities, "
f"{len(extraction_result.relationships)} relationships, "
f"{len(extraction_result.attributes)} attributes")
# Convert to RDF triples
converter = TripleConverter(ontology_subset, ontology_subset.ontology_id)
triples = converter.convert_all(extraction_result)
logger.info(f"Generated {len(triples)} RDF triples from simplified extraction")
return triples
except Exception as e:
logger.error(f"Simplified extraction error: {e}", exc_info=True)
return []
def build_extraction_variables(self, chunk: str, ontology_subset: OntologySubset) -> Dict[str, Any]:
"""Build variables for ontology-based extraction prompt template.

View file

@ -0,0 +1,234 @@
"""
Parser for simplified ontology extraction JSON format.
Parses the new entity-relationship-attribute format from LLM responses.
"""
import json
import logging
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class Entity:
"""Represents an extracted entity."""
entity: str
type: str
@dataclass
class Relationship:
"""Represents an extracted relationship."""
subject: str
subject_type: str
relation: str
object: str
object_type: str
@dataclass
class Attribute:
"""Represents an extracted attribute."""
entity: str
entity_type: str
attribute: str
value: str
@dataclass
class ExtractionResult:
"""Complete extraction result."""
entities: List[Entity]
relationships: List[Relationship]
attributes: List[Attribute]
def parse_extraction_response(response: Any) -> Optional[ExtractionResult]:
"""Parse LLM extraction response into structured format.
Args:
response: LLM response (string JSON or already parsed dict)
Returns:
ExtractionResult with parsed entities/relationships/attributes,
or None if parsing fails
"""
# Handle string response (parse JSON)
if isinstance(response, str):
try:
data = json.loads(response)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON response: {e}")
logger.debug(f"Response was: {response[:500]}")
return None
elif isinstance(response, dict):
data = response
else:
logger.error(f"Unexpected response type: {type(response)}")
return None
# Validate structure
if not isinstance(data, dict):
logger.error(f"Expected dict, got {type(data)}")
return None
# Parse entities
entities = []
entities_data = data.get('entities', [])
if not isinstance(entities_data, list):
logger.warning(f"'entities' is not a list: {type(entities_data)}")
entities_data = []
for entity_data in entities_data:
try:
entity = parse_entity(entity_data)
if entity:
entities.append(entity)
except Exception as e:
logger.warning(f"Failed to parse entity {entity_data}: {e}")
# Parse relationships
relationships = []
relationships_data = data.get('relationships', [])
if not isinstance(relationships_data, list):
logger.warning(f"'relationships' is not a list: {type(relationships_data)}")
relationships_data = []
for rel_data in relationships_data:
try:
relationship = parse_relationship(rel_data)
if relationship:
relationships.append(relationship)
except Exception as e:
logger.warning(f"Failed to parse relationship {rel_data}: {e}")
# Parse attributes
attributes = []
attributes_data = data.get('attributes', [])
if not isinstance(attributes_data, list):
logger.warning(f"'attributes' is not a list: {type(attributes_data)}")
attributes_data = []
for attr_data in attributes_data:
try:
attribute = parse_attribute(attr_data)
if attribute:
attributes.append(attribute)
except Exception as e:
logger.warning(f"Failed to parse attribute {attr_data}: {e}")
return ExtractionResult(
entities=entities,
relationships=relationships,
attributes=attributes
)
def parse_entity(data: Dict[str, Any]) -> Optional[Entity]:
"""Parse entity from dict.
Supports both kebab-case and snake_case field names for compatibility.
Args:
data: Entity dict with 'entity' and 'type' fields
Returns:
Entity object or None if invalid
"""
if not isinstance(data, dict):
logger.warning(f"Entity data is not a dict: {type(data)}")
return None
entity = data.get('entity')
entity_type = data.get('type')
if not entity or not entity_type:
logger.warning(f"Missing required fields in entity: {data}")
return None
if not isinstance(entity, str) or not isinstance(entity_type, str):
logger.warning(f"Entity fields must be strings: {data}")
return None
return Entity(entity=entity, type=entity_type)
def parse_relationship(data: Dict[str, Any]) -> Optional[Relationship]:
"""Parse relationship from dict.
Supports both kebab-case and snake_case field names for compatibility.
Args:
data: Relationship dict with subject, subject-type, relation, object, object-type
Returns:
Relationship object or None if invalid
"""
if not isinstance(data, dict):
logger.warning(f"Relationship data is not a dict: {type(data)}")
return None
subject = data.get('subject')
subject_type = data.get('subject-type') or data.get('subject_type')
relation = data.get('relation')
obj = data.get('object')
object_type = data.get('object-type') or data.get('object_type')
if not all([subject, subject_type, relation, obj, object_type]):
logger.warning(f"Missing required fields in relationship: {data}")
return None
if not all(isinstance(v, str) for v in [subject, subject_type, relation, obj, object_type]):
logger.warning(f"Relationship fields must be strings: {data}")
return None
return Relationship(
subject=subject,
subject_type=subject_type,
relation=relation,
object=obj,
object_type=object_type
)
def parse_attribute(data: Dict[str, Any]) -> Optional[Attribute]:
"""Parse attribute from dict.
Supports both kebab-case and snake_case field names for compatibility.
Args:
data: Attribute dict with entity, entity-type, attribute, value
Returns:
Attribute object or None if invalid
"""
if not isinstance(data, dict):
logger.warning(f"Attribute data is not a dict: {type(data)}")
return None
entity = data.get('entity')
entity_type = data.get('entity-type') or data.get('entity_type')
attribute = data.get('attribute')
value = data.get('value')
if not all([entity, entity_type, attribute, value is not None]):
logger.warning(f"Missing required fields in attribute: {data}")
return None
if not all(isinstance(v, str) for v in [entity, entity_type, attribute]):
logger.warning(f"Attribute fields must be strings: {data}")
return None
# Value can be string, number, bool - convert to string
if not isinstance(value, str):
value = str(value)
return Attribute(
entity=entity,
entity_type=entity_type,
attribute=attribute,
value=value
)

View file

@ -0,0 +1,228 @@
"""
Converts simplified extraction format to RDF triples.
Transforms entities, relationships, and attributes into proper RDF triples
with full URIs and correct is_uri flags.
"""
import logging
from typing import List, Optional
from .... schema import Triple, Value
from .... rdf import RDF_TYPE, RDF_LABEL
from .simplified_parser import Entity, Relationship, Attribute, ExtractionResult
from .entity_normalizer import EntityRegistry
from .ontology_selector import OntologySubset
logger = logging.getLogger(__name__)
class TripleConverter:
"""Converts extraction results to RDF triples."""
def __init__(self, ontology_subset: OntologySubset, ontology_id: str):
"""Initialize converter.
Args:
ontology_subset: Ontology subset with classes and properties
ontology_id: Ontology identifier for URI generation
"""
self.ontology_subset = ontology_subset
self.ontology_id = ontology_id
self.entity_registry = EntityRegistry(ontology_id)
def convert_all(self, extraction: ExtractionResult) -> List[Triple]:
"""Convert complete extraction result to RDF triples.
Args:
extraction: Parsed extraction with entities/relationships/attributes
Returns:
List of RDF Triple objects
"""
triples = []
# Convert entities (generates type + label triples)
for entity in extraction.entities:
entity_triples = self.convert_entity(entity)
triples.extend(entity_triples)
# Convert relationships
for relationship in extraction.relationships:
rel_triple = self.convert_relationship(relationship)
if rel_triple:
triples.append(rel_triple)
# Convert attributes
for attribute in extraction.attributes:
attr_triple = self.convert_attribute(attribute)
if attr_triple:
triples.append(attr_triple)
return triples
def convert_entity(self, entity: Entity) -> List[Triple]:
"""Convert entity to RDF triples (type + label).
Args:
entity: Entity object with name and type
Returns:
List containing type triple and label triple
"""
triples = []
# Get or create URI for this entity
entity_uri = self.entity_registry.get_or_create_uri(
entity.entity,
entity.type
)
# Look up class URI from ontology
class_uri = self._get_class_uri(entity.type)
if not class_uri:
logger.warning(f"Unknown entity type '{entity.type}', skipping entity '{entity.entity}'")
return triples
# Generate type triple: entity rdf:type ClassURI
type_triple = Triple(
s=Value(value=entity_uri, is_uri=True),
p=Value(value=RDF_TYPE, is_uri=True),
o=Value(value=class_uri, is_uri=True)
)
triples.append(type_triple)
# Generate label triple: entity rdfs:label "entity name"
label_triple = Triple(
s=Value(value=entity_uri, is_uri=True),
p=Value(value=RDF_LABEL, is_uri=True),
o=Value(value=entity.entity, is_uri=False) # Literal!
)
triples.append(label_triple)
return triples
def convert_relationship(self, relationship: Relationship) -> Optional[Triple]:
"""Convert relationship to RDF triple.
Args:
relationship: Relationship with subject/object entities and relation
Returns:
Triple connecting two entity URIs via property URI, or None if invalid
"""
# Get URIs for subject and object entities
subject_uri = self.entity_registry.get_or_create_uri(
relationship.subject,
relationship.subject_type
)
object_uri = self.entity_registry.get_or_create_uri(
relationship.object,
relationship.object_type
)
# Look up property URI from ontology
property_uri = self._get_object_property_uri(relationship.relation)
if not property_uri:
logger.warning(f"Unknown relationship '{relationship.relation}', skipping")
return None
# Generate triple: subject property object
return Triple(
s=Value(value=subject_uri, is_uri=True),
p=Value(value=property_uri, is_uri=True),
o=Value(value=object_uri, is_uri=True)
)
def convert_attribute(self, attribute: Attribute) -> Optional[Triple]:
"""Convert attribute to RDF triple.
Args:
attribute: Attribute with entity, attribute name, and literal value
Returns:
Triple with entity URI, property URI, and literal value, or None if invalid
"""
# Get URI for entity
entity_uri = self.entity_registry.get_or_create_uri(
attribute.entity,
attribute.entity_type
)
# Look up property URI from ontology
property_uri = self._get_datatype_property_uri(attribute.attribute)
if not property_uri:
logger.warning(f"Unknown attribute '{attribute.attribute}', skipping")
return None
# Generate triple: entity property "literal value"
return Triple(
s=Value(value=entity_uri, is_uri=True),
p=Value(value=property_uri, is_uri=True),
o=Value(value=attribute.value, is_uri=False) # Literal!
)
def _get_class_uri(self, class_id: str) -> Optional[str]:
"""Get full URI for ontology class.
Args:
class_id: Class identifier (e.g., "fo/Recipe")
Returns:
Full class URI or None if not found
"""
if class_id not in self.ontology_subset.classes:
return None
class_def = self.ontology_subset.classes[class_id]
# Extract URI from class definition
if isinstance(class_def, dict) and 'uri' in class_def:
return class_def['uri']
# Fallback: construct URI
return f"https://trustgraph.ai/ontology/{self.ontology_id}#{class_id}"
def _get_object_property_uri(self, property_id: str) -> Optional[str]:
"""Get full URI for object property.
Args:
property_id: Property identifier (e.g., "fo/has_ingredient")
Returns:
Full property URI or None if not found
"""
if property_id not in self.ontology_subset.object_properties:
return None
prop_def = self.ontology_subset.object_properties[property_id]
# Extract URI from property definition
if isinstance(prop_def, dict) and 'uri' in prop_def:
return prop_def['uri']
# Fallback: construct URI
return f"https://trustgraph.ai/ontology/{self.ontology_id}#{property_id}"
def _get_datatype_property_uri(self, property_id: str) -> Optional[str]:
"""Get full URI for datatype property.
Args:
property_id: Property identifier (e.g., "fo/serves")
Returns:
Full property URI or None if not found
"""
if property_id not in self.ontology_subset.datatype_properties:
return None
prop_def = self.ontology_subset.datatype_properties[property_id]
# Extract URI from property definition
if isinstance(prop_def, dict) and 'uri' in prop_def:
return prop_def['uri']
# Fallback: construct URI
return f"https://trustgraph.ai/ontology/{self.ontology_id}#{property_id}"