From ca626c8471540cf802e6546a9fa087b0a061cf89 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Mon, 9 Feb 2026 14:07:07 +0000 Subject: [PATCH] Protect from null embeddings in cores (#626) * Don't emit graph embeddings if there aren't any. * Don't store graph embeddings in a knowledge store if there's an empty list. * Translate between Cassandra's 'null' representing an empty list and an empty list which is what the surrounding code wants (and stored in the first place). * Avoid emitting empty embedding lists * Avoid output empty triple lists * Fix tests --- .../test_kg_extract_store_integration.py | 19 +++++--- .../embeddings/graph_embeddings/embeddings.py | 11 ++--- .../extract/kg/definitions/extract.py | 42 +++++++++--------- .../trustgraph/extract/kg/ontology/extract.py | 44 +++++-------------- .../extract/kg/relationships/extract.py | 21 ++++----- .../trustgraph/storage/knowledge/store.py | 6 ++- .../trustgraph/tables/knowledge.py | 36 ++++++++------- 7 files changed, 88 insertions(+), 91 deletions(-) diff --git a/tests/integration/test_kg_extract_store_integration.py b/tests/integration/test_kg_extract_store_integration.py index dd13789f..c22524fd 100644 --- a/tests/integration/test_kg_extract_store_integration.py +++ b/tests/integration/test_kg_extract_store_integration.py @@ -16,7 +16,7 @@ from trustgraph.extract.kg.definitions.extract import Processor as DefinitionsPr from trustgraph.extract.kg.relationships.extract import Processor as RelationshipsProcessor from trustgraph.storage.knowledge.store import Processor as KnowledgeStoreProcessor from trustgraph.schema import Chunk, Triple, Triples, Metadata, Value, Error -from trustgraph.schema import EntityContext, EntityContexts, GraphEmbeddings +from trustgraph.schema import EntityContext, EntityContexts, GraphEmbeddings, EntityEmbeddings from trustgraph.rdf import TRUSTGRAPH_ENTITIES, DEFINITION, RDF_LABEL, SUBJECT_OF @@ -405,9 +405,14 @@ class TestKnowledgeGraphPipelineIntegration: collection="test_collection", metadata=[] ), - entities=[] + entities=[ + EntityEmbeddings( + entity=Value(value="http://example.org/entity", is_uri=True), + vectors=[[0.1, 0.2, 0.3]] + ) + ] ) - + mock_msg = MagicMock() mock_msg.value.return_value = sample_embeddings @@ -496,12 +501,12 @@ class TestKnowledgeGraphPipelineIntegration: await definitions_processor.on_message(mock_msg, mock_consumer, mock_flow_context) # Assert - # Should still call producers but with empty results + # Should NOT call producers with empty results (avoids Cassandra NULL issues) triples_producer = mock_flow_context("triples") entity_contexts_producer = mock_flow_context("entity-contexts") - - triples_producer.send.assert_called_once() - entity_contexts_producer.send.assert_called_once() + + triples_producer.send.assert_not_called() + entity_contexts_producer.send.assert_not_called() @pytest.mark.asyncio async def test_invalid_extraction_format_handling(self, definitions_processor, mock_flow_context, sample_chunk): diff --git a/trustgraph-flow/trustgraph/embeddings/graph_embeddings/embeddings.py b/trustgraph-flow/trustgraph/embeddings/graph_embeddings/embeddings.py index 4726be4d..7b2c779b 100755 --- a/trustgraph-flow/trustgraph/embeddings/graph_embeddings/embeddings.py +++ b/trustgraph-flow/trustgraph/embeddings/graph_embeddings/embeddings.py @@ -73,12 +73,13 @@ class Processor(FlowProcessor): ) ) - r = GraphEmbeddings( - metadata=v.metadata, - entities=entities, - ) + if entities: + r = GraphEmbeddings( + metadata=v.metadata, + entities=entities, + ) - await flow("output").send(r) + await flow("output").send(r) except Exception as e: logger.error("Exception occurred", exc_info=True) diff --git a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py index 1d414b7e..2693e456 100755 --- a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py @@ -168,27 +168,29 @@ class Processor(FlowProcessor): entities.append(ec) - await self.emit_triples( - flow("triples"), - Metadata( - id=v.metadata.id, - metadata=[], - user=v.metadata.user, - collection=v.metadata.collection, - ), - triples - ) + if triples: + await self.emit_triples( + flow("triples"), + Metadata( + id=v.metadata.id, + metadata=[], + user=v.metadata.user, + collection=v.metadata.collection, + ), + triples + ) - await self.emit_ecs( - flow("entity-contexts"), - Metadata( - id=v.metadata.id, - metadata=[], - user=v.metadata.user, - collection=v.metadata.collection, - ), - entities - ) + if entities: + await self.emit_ecs( + flow("entity-contexts"), + Metadata( + id=v.metadata.id, + metadata=[], + user=v.metadata.user, + collection=v.metadata.collection, + ), + entities + ) except Exception as e: logger.error(f"Definitions extraction exception: {e}", exc_info=True) diff --git a/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py b/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py index 335f07d2..d169fce4 100644 --- a/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py @@ -274,17 +274,6 @@ class Processor(FlowProcessor): if not ontology_subsets: logger.warning("No relevant ontology elements found for chunk") - # Emit empty outputs - await self.emit_triples( - flow("triples"), - v.metadata, - [] - ) - await self.emit_entity_contexts( - flow("entity-contexts"), - v.metadata, - [] - ) return # Merge subsets if multiple ontologies matched @@ -319,35 +308,26 @@ class Processor(FlowProcessor): entity_contexts = self.build_entity_contexts(all_triples) # Emit all triples (extracted + ontology definitions) - await self.emit_triples( - flow("triples"), - v.metadata, - all_triples - ) + if all_triples: + await self.emit_triples( + flow("triples"), + v.metadata, + all_triples + ) # Emit entity contexts - await self.emit_entity_contexts( - flow("entity-contexts"), - v.metadata, - entity_contexts - ) + if entity_contexts: + await self.emit_entity_contexts( + flow("entity-contexts"), + v.metadata, + entity_contexts + ) logger.info(f"Extracted {len(triples)} content triples + {len(ontology_triples)} ontology triples " f"= {len(all_triples)} total triples and {len(entity_contexts)} entity contexts") except Exception as e: logger.error(f"OntoRAG extraction exception: {e}", exc_info=True) - # Emit empty outputs on error - await self.emit_triples( - flow("triples"), - v.metadata, - [] - ) - await self.emit_entity_contexts( - flow("entity-contexts"), - v.metadata, - [] - ) async def extract_with_simplified_format( self, diff --git a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py index 6d461997..a8129631 100755 --- a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py @@ -181,16 +181,17 @@ class Processor(FlowProcessor): o=Value(value=v.metadata.id, is_uri=True) )) - await self.emit_triples( - flow("triples"), - Metadata( - id=v.metadata.id, - metadata=[], - user=v.metadata.user, - collection=v.metadata.collection, - ), - triples - ) + if triples: + await self.emit_triples( + flow("triples"), + Metadata( + id=v.metadata.id, + metadata=[], + user=v.metadata.user, + collection=v.metadata.collection, + ), + triples + ) except Exception as e: logger.error(f"Relationship extraction exception: {e}", exc_info=True) diff --git a/trustgraph-flow/trustgraph/storage/knowledge/store.py b/trustgraph-flow/trustgraph/storage/knowledge/store.py index a79b7b83..475604b6 100644 --- a/trustgraph-flow/trustgraph/storage/knowledge/store.py +++ b/trustgraph-flow/trustgraph/storage/knowledge/store.py @@ -64,12 +64,14 @@ class Processor(FlowProcessor): async def on_triples(self, msg, consumer, flow): v = msg.value() - await self.table_store.add_triples(v) + if v.triples: + await self.table_store.add_triples(v) async def on_graph_embeddings(self, msg, consumer, flow): v = msg.value() - await self.table_store.add_graph_embeddings(v) + if v.entities: + await self.table_store.add_graph_embeddings(v) @staticmethod def add_args(parser): diff --git a/trustgraph-flow/trustgraph/tables/knowledge.py b/trustgraph-flow/trustgraph/tables/knowledge.py index 1ee61088..ffd54d63 100644 --- a/trustgraph-flow/trustgraph/tables/knowledge.py +++ b/trustgraph-flow/trustgraph/tables/knowledge.py @@ -423,14 +423,17 @@ class KnowledgeTableStore: else: metadata = [] - triples = [ - Triple( - s = Value(value = elt[0], is_uri = elt[1]), - p = Value(value = elt[2], is_uri = elt[3]), - o = Value(value = elt[4], is_uri = elt[5]), - ) - for elt in row[3] - ] + if row[3]: + triples = [ + Triple( + s = Value(value = elt[0], is_uri = elt[1]), + p = Value(value = elt[2], is_uri = elt[3]), + o = Value(value = elt[4], is_uri = elt[5]), + ) + for elt in row[3] + ] + else: + triples = [] await receiver( Triples( @@ -479,13 +482,16 @@ class KnowledgeTableStore: else: metadata = [] - entities = [ - EntityEmbeddings( - entity = Value(value = ent[0][0], is_uri = ent[0][1]), - vectors = ent[1] - ) - for ent in row[3] - ] + if row[3]: + entities = [ + EntityEmbeddings( + entity = Value(value = ent[0][0], is_uri = ent[0][1]), + vectors = ent[1] + ) + for ent in row[3] + ] + else: + entities = [] await receiver( GraphEmbeddings(