Protect null embeddings - v2.0 (#627)

* Don't emit graph embeddings if there aren't any.

* Don't store graph embeddings in a knowledge store if there's an empty list.

* Translate between Cassandra's 'null' representing an empty list and an
  empty list which is what the surrounding code wants (and stored in the
  first place).

* Avoid emitting empty embedding lists

* Avoid output empty triple lists

* Fix tests
This commit is contained in:
cybermaggedon 2026-02-09 14:57:36 +00:00 committed by GitHub
parent 98827e5561
commit 8574861196
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 88 additions and 91 deletions

View file

@ -16,7 +16,7 @@ from trustgraph.extract.kg.definitions.extract import Processor as DefinitionsPr
from trustgraph.extract.kg.relationships.extract import Processor as RelationshipsProcessor from trustgraph.extract.kg.relationships.extract import Processor as RelationshipsProcessor
from trustgraph.storage.knowledge.store import Processor as KnowledgeStoreProcessor from trustgraph.storage.knowledge.store import Processor as KnowledgeStoreProcessor
from trustgraph.schema import Chunk, Triple, Triples, Metadata, Term, Error, IRI, LITERAL from trustgraph.schema import Chunk, Triple, Triples, Metadata, Term, Error, IRI, LITERAL
from trustgraph.schema import EntityContext, EntityContexts, GraphEmbeddings from trustgraph.schema import EntityContext, EntityContexts, GraphEmbeddings, EntityEmbeddings
from trustgraph.rdf import TRUSTGRAPH_ENTITIES, DEFINITION, RDF_LABEL, SUBJECT_OF from trustgraph.rdf import TRUSTGRAPH_ENTITIES, DEFINITION, RDF_LABEL, SUBJECT_OF
@ -405,7 +405,12 @@ class TestKnowledgeGraphPipelineIntegration:
collection="test_collection", collection="test_collection",
metadata=[] metadata=[]
), ),
entities=[] entities=[
EntityEmbeddings(
entity=Term(type=IRI, iri="http://example.org/entity"),
vectors=[[0.1, 0.2, 0.3]]
)
]
) )
mock_msg = MagicMock() mock_msg = MagicMock()
@ -496,12 +501,12 @@ class TestKnowledgeGraphPipelineIntegration:
await definitions_processor.on_message(mock_msg, mock_consumer, mock_flow_context) await definitions_processor.on_message(mock_msg, mock_consumer, mock_flow_context)
# Assert # Assert
# Should still call producers but with empty results # Should NOT call producers with empty results (avoids Cassandra NULL issues)
triples_producer = mock_flow_context("triples") triples_producer = mock_flow_context("triples")
entity_contexts_producer = mock_flow_context("entity-contexts") entity_contexts_producer = mock_flow_context("entity-contexts")
triples_producer.send.assert_called_once() triples_producer.send.assert_not_called()
entity_contexts_producer.send.assert_called_once() entity_contexts_producer.send.assert_not_called()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_invalid_extraction_format_handling(self, definitions_processor, mock_flow_context, sample_chunk): async def test_invalid_extraction_format_handling(self, definitions_processor, mock_flow_context, sample_chunk):

View file

@ -73,6 +73,7 @@ class Processor(FlowProcessor):
) )
) )
if entities:
r = GraphEmbeddings( r = GraphEmbeddings(
metadata=v.metadata, metadata=v.metadata,
entities=entities, entities=entities,

View file

@ -168,6 +168,7 @@ class Processor(FlowProcessor):
entities.append(ec) entities.append(ec)
if triples:
await self.emit_triples( await self.emit_triples(
flow("triples"), flow("triples"),
Metadata( Metadata(
@ -179,6 +180,7 @@ class Processor(FlowProcessor):
triples triples
) )
if entities:
await self.emit_ecs( await self.emit_ecs(
flow("entity-contexts"), flow("entity-contexts"),
Metadata( Metadata(

View file

@ -282,17 +282,6 @@ class Processor(FlowProcessor):
if not ontology_subsets: if not ontology_subsets:
logger.warning("No relevant ontology elements found for chunk") logger.warning("No relevant ontology elements found for chunk")
# Emit empty outputs
await self.emit_triples(
flow("triples"),
v.metadata,
[]
)
await self.emit_entity_contexts(
flow("entity-contexts"),
v.metadata,
[]
)
return return
# Merge subsets if multiple ontologies matched # Merge subsets if multiple ontologies matched
@ -327,6 +316,7 @@ class Processor(FlowProcessor):
entity_contexts = self.build_entity_contexts(all_triples) entity_contexts = self.build_entity_contexts(all_triples)
# Emit all triples (extracted + ontology definitions) # Emit all triples (extracted + ontology definitions)
if all_triples:
await self.emit_triples( await self.emit_triples(
flow("triples"), flow("triples"),
v.metadata, v.metadata,
@ -334,6 +324,7 @@ class Processor(FlowProcessor):
) )
# Emit entity contexts # Emit entity contexts
if entity_contexts:
await self.emit_entity_contexts( await self.emit_entity_contexts(
flow("entity-contexts"), flow("entity-contexts"),
v.metadata, v.metadata,
@ -345,17 +336,6 @@ class Processor(FlowProcessor):
except Exception as e: except Exception as e:
logger.error(f"OntoRAG extraction exception: {e}", exc_info=True) logger.error(f"OntoRAG extraction exception: {e}", exc_info=True)
# Emit empty outputs on error
await self.emit_triples(
flow("triples"),
v.metadata,
[]
)
await self.emit_entity_contexts(
flow("entity-contexts"),
v.metadata,
[]
)
async def extract_with_simplified_format( async def extract_with_simplified_format(
self, self,

View file

@ -181,6 +181,7 @@ class Processor(FlowProcessor):
o=Term(type=IRI, iri=v.metadata.id) o=Term(type=IRI, iri=v.metadata.id)
)) ))
if triples:
await self.emit_triples( await self.emit_triples(
flow("triples"), flow("triples"),
Metadata( Metadata(

View file

@ -64,11 +64,13 @@ class Processor(FlowProcessor):
async def on_triples(self, msg, consumer, flow): async def on_triples(self, msg, consumer, flow):
v = msg.value() v = msg.value()
if v.triples:
await self.table_store.add_triples(v) await self.table_store.add_triples(v)
async def on_graph_embeddings(self, msg, consumer, flow): async def on_graph_embeddings(self, msg, consumer, flow):
v = msg.value() v = msg.value()
if v.entities:
await self.table_store.add_graph_embeddings(v) await self.table_store.add_graph_embeddings(v)
@staticmethod @staticmethod

View file

@ -435,6 +435,7 @@ class KnowledgeTableStore:
else: else:
metadata = [] metadata = []
if row[3]:
triples = [ triples = [
Triple( Triple(
s = tuple_to_term(elt[0], elt[1]), s = tuple_to_term(elt[0], elt[1]),
@ -443,6 +444,8 @@ class KnowledgeTableStore:
) )
for elt in row[3] for elt in row[3]
] ]
else:
triples = []
await receiver( await receiver(
Triples( Triples(
@ -491,6 +494,7 @@ class KnowledgeTableStore:
else: else:
metadata = [] metadata = []
if row[3]:
entities = [ entities = [
EntityEmbeddings( EntityEmbeddings(
entity = tuple_to_term(ent[0][0], ent[0][1]), entity = tuple_to_term(ent[0][0], ent[0][1]),
@ -498,6 +502,8 @@ class KnowledgeTableStore:
) )
for ent in row[3] for ent in row[3]
] ]
else:
entities = []
await receiver( await receiver(
GraphEmbeddings( GraphEmbeddings(