diff --git a/tests/integration/test_kg_extract_store_integration.py b/tests/integration/test_kg_extract_store_integration.py index 9131f1ee..2baa1d4d 100644 --- a/tests/integration/test_kg_extract_store_integration.py +++ b/tests/integration/test_kg_extract_store_integration.py @@ -147,6 +147,8 @@ class TestKnowledgeGraphPipelineIntegration: processor.emit_triples = DefinitionsProcessor.emit_triples.__get__(processor, DefinitionsProcessor) processor.emit_ecs = DefinitionsProcessor.emit_ecs.__get__(processor, DefinitionsProcessor) processor.on_message = DefinitionsProcessor.on_message.__get__(processor, DefinitionsProcessor) + processor.triples_batch_size = 50 + processor.entity_batch_size = 5 return processor @pytest.fixture @@ -156,6 +158,7 @@ class TestKnowledgeGraphPipelineIntegration: processor.to_uri = RelationshipsProcessor.to_uri.__get__(processor, RelationshipsProcessor) processor.emit_triples = RelationshipsProcessor.emit_triples.__get__(processor, RelationshipsProcessor) processor.on_message = RelationshipsProcessor.on_message.__get__(processor, RelationshipsProcessor) + processor.triples_batch_size = 50 return processor @pytest.mark.asyncio diff --git a/trustgraph-flow/trustgraph/embeddings/graph_embeddings/embeddings.py b/trustgraph-flow/trustgraph/embeddings/graph_embeddings/embeddings.py index 7b2c779b..1b63774d 100755 --- a/trustgraph-flow/trustgraph/embeddings/graph_embeddings/embeddings.py +++ b/trustgraph-flow/trustgraph/embeddings/graph_embeddings/embeddings.py @@ -16,12 +16,14 @@ import logging logger = logging.getLogger(__name__) default_ident = "graph-embeddings" +default_batch_size = 5 class Processor(FlowProcessor): def __init__(self, **params): id = params.get("id") + self.batch_size = params.get("batch_size", default_batch_size) super(Processor, self).__init__( **params | { @@ -73,12 +75,13 @@ class Processor(FlowProcessor): ) ) - if entities: + # Send in batches to avoid oversized messages + for i in range(0, len(entities), self.batch_size): + batch = entities[i:i + self.batch_size] r = GraphEmbeddings( metadata=v.metadata, - entities=entities, + entities=batch, ) - await flow("output").send(r) except Exception as e: @@ -92,6 +95,13 @@ class Processor(FlowProcessor): @staticmethod def add_args(parser): + parser.add_argument( + '--batch-size', + type=int, + default=default_batch_size, + help=f'Maximum entities per output message (default: {default_batch_size})' + ) + FlowProcessor.add_args(parser) def run(): diff --git a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py index 9d289e2d..72275a8c 100755 --- a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py @@ -26,6 +26,8 @@ SUBJECT_OF_VALUE = Term(type=IRI, iri=SUBJECT_OF) default_ident = "kg-extract-definitions" default_concurrency = 1 +default_triples_batch_size = 50 +default_entity_batch_size = 5 class Processor(FlowProcessor): @@ -33,6 +35,8 @@ class Processor(FlowProcessor): id = params.get("id") concurrency = params.get("concurrency", 1) + self.triples_batch_size = params.get("triples_batch_size", default_triples_batch_size) + self.entity_batch_size = params.get("entity_batch_size", default_entity_batch_size) super(Processor, self).__init__( **params | { @@ -173,7 +177,9 @@ class Processor(FlowProcessor): context=defn["definition"], )) - if triples: + # Send triples in batches + for i in range(0, len(triples), self.triples_batch_size): + batch = triples[i:i + self.triples_batch_size] await self.emit_triples( flow("triples"), Metadata( @@ -182,10 +188,12 @@ class Processor(FlowProcessor): user=v.metadata.user, collection=v.metadata.collection, ), - triples + batch ) - if entities: + # Send entity contexts in batches + for i in range(0, len(entities), self.entity_batch_size): + batch = entities[i:i + self.entity_batch_size] await self.emit_ecs( flow("entity-contexts"), Metadata( @@ -194,7 +202,7 @@ class Processor(FlowProcessor): user=v.metadata.user, collection=v.metadata.collection, ), - entities + batch ) except Exception as e: @@ -212,6 +220,20 @@ class Processor(FlowProcessor): help=f'Concurrent processing threads (default: {default_concurrency})' ) + parser.add_argument( + '--triples-batch-size', + type=int, + default=default_triples_batch_size, + help=f'Maximum triples per output message (default: {default_triples_batch_size})' + ) + + parser.add_argument( + '--entity-batch-size', + type=int, + default=default_entity_batch_size, + help=f'Maximum entity contexts per output message (default: {default_entity_batch_size})' + ) + FlowProcessor.add_args(parser) def run(): diff --git a/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py b/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py index 1644f729..a0d9a3fe 100644 --- a/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/ontology/extract.py @@ -27,6 +27,8 @@ logger = logging.getLogger(__name__) default_ident = "kg-extract-ontology" default_concurrency = 1 +default_triples_batch_size = 50 +default_entity_batch_size = 5 # URI prefix mappings for common namespaces URI_PREFIXES = { @@ -53,6 +55,8 @@ class Processor(FlowProcessor): def __init__(self, **params): id = params.get("id", default_ident) concurrency = params.get("concurrency", default_concurrency) + self.triples_batch_size = params.get("triples_batch_size", default_triples_batch_size) + self.entity_batch_size = params.get("entity_batch_size", default_entity_batch_size) super(Processor, self).__init__( **params | { @@ -315,20 +319,22 @@ class Processor(FlowProcessor): # Build entity contexts from all triples (including ontology elements) entity_contexts = self.build_entity_contexts(all_triples) - # Emit all triples (extracted + ontology definitions) - if all_triples: + # Emit triples in batches + for i in range(0, len(all_triples), self.triples_batch_size): + batch = all_triples[i:i + self.triples_batch_size] await self.emit_triples( flow("triples"), v.metadata, - all_triples + batch ) - # Emit entity contexts - if entity_contexts: + # Emit entity contexts in batches + for i in range(0, len(entity_contexts), self.entity_batch_size): + batch = entity_contexts[i:i + self.entity_batch_size] await self.emit_entity_contexts( flow("entity-contexts"), v.metadata, - entity_contexts + batch ) logger.info(f"Extracted {len(triples)} content triples + {len(ontology_triples)} ontology triples " @@ -864,6 +870,18 @@ class Processor(FlowProcessor): default=0.3, help='Similarity threshold for ontology matching (default: 0.3, range: 0.0-1.0)' ) + parser.add_argument( + '--triples-batch-size', + type=int, + default=default_triples_batch_size, + help=f'Maximum triples per output message (default: {default_triples_batch_size})' + ) + parser.add_argument( + '--entity-batch-size', + type=int, + default=default_entity_batch_size, + help=f'Maximum entity contexts per output message (default: {default_entity_batch_size})' + ) FlowProcessor.add_args(parser) diff --git a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py index c92a33e9..7ab51555 100755 --- a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py @@ -25,6 +25,7 @@ SUBJECT_OF_VALUE = Term(type=IRI, iri=SUBJECT_OF) default_ident = "kg-extract-relationships" default_concurrency = 1 +default_triples_batch_size = 50 class Processor(FlowProcessor): @@ -32,6 +33,7 @@ class Processor(FlowProcessor): id = params.get("id") concurrency = params.get("concurrency", 1) + self.triples_batch_size = params.get("triples_batch_size", default_triples_batch_size) super(Processor, self).__init__( **params | { @@ -181,7 +183,9 @@ class Processor(FlowProcessor): o=Term(type=IRI, iri=v.metadata.id) )) - if triples: + # Send triples in batches + for i in range(0, len(triples), self.triples_batch_size): + batch = triples[i:i + self.triples_batch_size] await self.emit_triples( flow("triples"), Metadata( @@ -190,7 +194,7 @@ class Processor(FlowProcessor): user=v.metadata.user, collection=v.metadata.collection, ), - triples + batch ) except Exception as e: @@ -208,6 +212,13 @@ class Processor(FlowProcessor): help=f'Concurrent processing threads (default: {default_concurrency})' ) + parser.add_argument( + '--triples-batch-size', + type=int, + default=default_triples_batch_size, + help=f'Maximum triples per output message (default: {default_triples_batch_size})' + ) + FlowProcessor.add_args(parser) def run():