Fix/extraction prov (#662)

Quoted triple fixes, including...

1. Updated triple_provenance_triples() in triples.py:
   - Now accepts a Triple object directly
   - Creates the reification triple using TRIPLE term type: stmt_uri tg:reifies
         <<extracted_triple>>
   - Includes it in the returned provenance triples
    
2. Updated definitions extractor:
   - Added imports for provenance functions and component version
   - Added ParameterSpec for optional llm-model and ontology flow parameters
   - For each definition triple, generates provenance with reification
    
3. Updated relationships extractor:
   - Same changes as definitions extractor
This commit is contained in:
cybermaggedon 2026-03-06 12:23:58 +00:00 committed by GitHub
parent cd5580be59
commit 2b9232917c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 361 additions and 72 deletions

View file

@ -90,8 +90,14 @@ class EntityVectors:
max_length=65535,
)
chunk_id_field = FieldSchema(
name="chunk_id",
dtype=DataType.VARCHAR,
max_length=65535,
)
schema = CollectionSchema(
fields = [pkey_field, vec_field, entity_field],
fields = [pkey_field, vec_field, entity_field, chunk_id_field],
description = "Graph embedding schema",
)
@ -119,17 +125,18 @@ class EntityVectors:
self.collections[(dimension, user, collection)] = collection_name
logger.info(f"Created Milvus collection {collection_name} with dimension {dimension}")
def insert(self, embeds, entity, user, collection):
def insert(self, embeds, entity, user, collection, chunk_id=""):
dim = len(embeds)
if (dim, user, collection) not in self.collections:
self.init_collection(dim, user, collection)
data = [
{
"vector": embeds,
"entity": entity,
"chunk_id": chunk_id,
}
]

View file

@ -18,7 +18,10 @@ from .... schema import PromptRequest, PromptResponse
from .... rdf import TRUSTGRAPH_ENTITIES, DEFINITION, RDF_LABEL, SUBJECT_OF
from .... base import FlowProcessor, ConsumerSpec, ProducerSpec
from .... base import PromptClientSpec
from .... base import PromptClientSpec, ParameterSpec
from .... provenance import statement_uri, triple_provenance_triples
from .... flow_version import __version__ as COMPONENT_VERSION
DEFINITION_VALUE = Term(type=IRI, iri=DEFINITION)
RDF_LABEL_VALUE = Term(type=IRI, iri=RDF_LABEL)
@ -75,6 +78,10 @@ class Processor(FlowProcessor):
)
)
# Optional flow parameters for provenance
self.register_specification(ParameterSpec("llm-model"))
self.register_specification(ParameterSpec("ontology"))
def to_uri(self, text):
part = text.replace(" ", "-").lower().encode("utf-8")
@ -132,6 +139,10 @@ class Processor(FlowProcessor):
chunk_doc_id = v.document_id if v.document_id else v.metadata.id
chunk_uri = v.metadata.id # The URI form for the chunk
# Get optional provenance parameters
llm_model = flow("llm-model")
ontology_uri = flow("ontology")
# Note: Document metadata is now emitted once by librarian at processing
# initiation, so we don't need to duplicate it here.
@ -157,9 +168,24 @@ class Processor(FlowProcessor):
o=Term(type=LITERAL, value=s),
))
triples.append(Triple(
# The definition triple - this is the main extracted fact
definition_triple = Triple(
s=s_value, p=DEFINITION_VALUE, o=o_value
))
)
triples.append(definition_triple)
# Generate provenance for the definition triple (reification)
stmt_uri = statement_uri()
prov_triples = triple_provenance_triples(
stmt_uri=stmt_uri,
extracted_triple=definition_triple,
chunk_uri=chunk_uri,
component_name=default_ident,
component_version=COMPONENT_VERSION,
llm_model=llm_model,
ontology_uri=ontology_uri,
)
triples.extend(prov_triples)
# Link entity to chunk (not top-level document)
triples.append(Triple(

View file

@ -18,7 +18,10 @@ from .... schema import PromptRequest, PromptResponse
from .... rdf import RDF_LABEL, TRUSTGRAPH_ENTITIES, SUBJECT_OF
from .... base import FlowProcessor, ConsumerSpec, ProducerSpec
from .... base import PromptClientSpec
from .... base import PromptClientSpec, ParameterSpec
from .... provenance import statement_uri, triple_provenance_triples
from .... flow_version import __version__ as COMPONENT_VERSION
RDF_LABEL_VALUE = Term(type=IRI, iri=RDF_LABEL)
SUBJECT_OF_VALUE = Term(type=IRI, iri=SUBJECT_OF)
@ -65,6 +68,10 @@ class Processor(FlowProcessor):
)
)
# Optional flow parameters for provenance
self.register_specification(ParameterSpec("llm-model"))
self.register_specification(ParameterSpec("ontology"))
def to_uri(self, text):
part = text.replace(" ", "-").lower().encode("utf-8")
@ -113,6 +120,10 @@ class Processor(FlowProcessor):
chunk_doc_id = v.document_id if v.document_id else v.metadata.id
chunk_uri = v.metadata.id # The URI form for the chunk
# Get optional provenance parameters
llm_model = flow("llm-model")
ontology_uri = flow("ontology")
# Note: Document metadata is now emitted once by librarian at processing
# initiation, so we don't need to duplicate it here.
@ -142,11 +153,26 @@ class Processor(FlowProcessor):
else:
o_value = Term(type=LITERAL, value=str(o))
triples.append(Triple(
# The relationship triple - this is the main extracted fact
relationship_triple = Triple(
s=s_value,
p=p_value,
o=o_value
))
)
triples.append(relationship_triple)
# Generate provenance for the relationship triple (reification)
stmt_uri = statement_uri()
prov_triples = triple_provenance_triples(
stmt_uri=stmt_uri,
extracted_triple=relationship_triple,
chunk_uri=chunk_uri,
component_name=default_ident,
component_version=COMPONENT_VERSION,
llm_model=llm_model,
ontology_uri=ontology_uri,
)
triples.extend(prov_triples)
# Label for s
triples.append(Triple(

View file

@ -6,11 +6,13 @@ null. Output is a list of quads.
import logging
import json
from .... direct.cassandra_kg import (
EntityCentricKnowledgeGraph, GRAPH_WILDCARD, DEFAULT_GRAPH
)
from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error
from .... schema import Term, Triple, IRI, LITERAL
from .... schema import Term, Triple, IRI, LITERAL, TRIPLE
from .... base import TriplesQueryService
from .... base.cassandra_config import add_cassandra_args, resolve_cassandra_config
@ -33,6 +35,36 @@ def get_term_value(term):
return term.id or term.value
def deserialize_term(term_dict):
"""Deserialize a term from JSON structure."""
if term_dict is None:
return None
term_type = term_dict.get("type", "")
if term_type == IRI:
return Term(type=IRI, iri=term_dict.get("iri", ""))
elif term_type == LITERAL:
return Term(
type=LITERAL,
value=term_dict.get("value", ""),
datatype=term_dict.get("datatype", ""),
language=term_dict.get("language", "")
)
elif term_type == TRIPLE:
# Recursive for nested triples
nested = term_dict.get("triple")
if nested:
return Term(
type=TRIPLE,
triple=Triple(
s=deserialize_term(nested.get("s")),
p=deserialize_term(nested.get("p")),
o=deserialize_term(nested.get("o")),
)
)
# Fallback
return Term(type=LITERAL, value=str(term_dict))
def create_term(value, otype=None, dtype=None, lang=None):
"""
Create a Term from a string value, optionally using type metadata.
@ -57,8 +89,22 @@ def create_term(value, otype=None, dtype=None, lang=None):
language=lang or ""
)
elif otype == 't':
# Triple/reification - treat as IRI for now
return Term(type=IRI, iri=value)
# Triple/reification - parse JSON and create nested Triple
try:
triple_data = json.loads(value) if isinstance(value, str) else value
if isinstance(triple_data, dict):
return Term(
type=TRIPLE,
triple=Triple(
s=deserialize_term(triple_data.get("s")),
p=deserialize_term(triple_data.get("p")),
o=deserialize_term(triple_data.get("o")),
)
)
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"Failed to parse triple JSON: {e}")
# Fallback if parsing fails
return Term(type=LITERAL, value=str(value))
else:
# Unknown otype, fall back to heuristic
pass

View file

@ -57,7 +57,8 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
self.vecstore.insert(
vec, entity_value,
message.metadata.user,
message.metadata.collection
message.metadata.collection,
chunk_id=entity.chunk_id or "",
)
@staticmethod

View file

@ -137,11 +137,15 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
# Generate unique ID for each vector
vector_id = str(uuid.uuid4())
metadata = {"entity": entity_value}
if entity.chunk_id:
metadata["chunk_id"] = entity.chunk_id
records = [
{
"id": vector_id,
"values": vec,
"metadata": { "entity": entity_value },
"metadata": metadata,
}
]

View file

@ -90,15 +90,19 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
)
)
payload = {
"entity": entity_value,
}
if entity.chunk_id:
payload["chunk_id"] = entity.chunk_id
self.qdrant.upsert(
collection_name=collection,
points=[
PointStruct(
id=str(uuid.uuid4()),
vector=vec,
payload={
"entity": entity_value,
}
payload=payload,
)
]
)

View file

@ -9,6 +9,7 @@ import os
import argparse
import time
import logging
import json
from .... direct.cassandra_kg import (
EntityCentricKnowledgeGraph, DEFAULT_GRAPH
@ -25,6 +26,37 @@ logger = logging.getLogger(__name__)
default_ident = "triples-write"
def serialize_triple(triple):
"""Serialize a Triple object to JSON for storage."""
if triple is None:
return None
def term_to_dict(term):
if term is None:
return None
result = {"type": term.type}
if term.type == IRI:
result["iri"] = term.iri
elif term.type == LITERAL:
result["value"] = term.value
if term.datatype:
result["datatype"] = term.datatype
if term.language:
result["language"] = term.language
elif term.type == BLANK:
result["id"] = term.id
elif term.type == TRIPLE:
result["triple"] = serialize_triple(term.triple)
return result
return json.dumps({
"s": term_to_dict(triple.s),
"p": term_to_dict(triple.p),
"o": term_to_dict(triple.o),
})
def get_term_value(term):
"""Extract the string value from a Term"""
if term is None:
@ -33,6 +65,9 @@ def get_term_value(term):
return term.iri
elif term.type == LITERAL:
return term.value
elif term.type == TRIPLE:
# Serialize nested triple as JSON
return serialize_triple(term.triple)
else:
# For blank nodes or other types, use id or value
return term.id or term.value