Entity & triple batch size limits (#635)

* Entities and triples are emitted in batches with a batch limit to manage
overloading downstream.

* Update tests
This commit is contained in:
cybermaggedon 2026-02-16 17:38:03 +00:00 committed by GitHub
parent fe389354f6
commit d886358be6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 79 additions and 15 deletions

View file

@ -147,6 +147,8 @@ class TestKnowledgeGraphPipelineIntegration:
processor.emit_triples = DefinitionsProcessor.emit_triples.__get__(processor, DefinitionsProcessor) processor.emit_triples = DefinitionsProcessor.emit_triples.__get__(processor, DefinitionsProcessor)
processor.emit_ecs = DefinitionsProcessor.emit_ecs.__get__(processor, DefinitionsProcessor) processor.emit_ecs = DefinitionsProcessor.emit_ecs.__get__(processor, DefinitionsProcessor)
processor.on_message = DefinitionsProcessor.on_message.__get__(processor, DefinitionsProcessor) processor.on_message = DefinitionsProcessor.on_message.__get__(processor, DefinitionsProcessor)
processor.triples_batch_size = 50
processor.entity_batch_size = 5
return processor return processor
@pytest.fixture @pytest.fixture
@ -156,6 +158,7 @@ class TestKnowledgeGraphPipelineIntegration:
processor.to_uri = RelationshipsProcessor.to_uri.__get__(processor, RelationshipsProcessor) processor.to_uri = RelationshipsProcessor.to_uri.__get__(processor, RelationshipsProcessor)
processor.emit_triples = RelationshipsProcessor.emit_triples.__get__(processor, RelationshipsProcessor) processor.emit_triples = RelationshipsProcessor.emit_triples.__get__(processor, RelationshipsProcessor)
processor.on_message = RelationshipsProcessor.on_message.__get__(processor, RelationshipsProcessor) processor.on_message = RelationshipsProcessor.on_message.__get__(processor, RelationshipsProcessor)
processor.triples_batch_size = 50
return processor return processor
@pytest.mark.asyncio @pytest.mark.asyncio

View file

@ -16,12 +16,14 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
default_ident = "graph-embeddings" default_ident = "graph-embeddings"
default_batch_size = 5
class Processor(FlowProcessor): class Processor(FlowProcessor):
def __init__(self, **params): def __init__(self, **params):
id = params.get("id") id = params.get("id")
self.batch_size = params.get("batch_size", default_batch_size)
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
@ -73,12 +75,13 @@ class Processor(FlowProcessor):
) )
) )
if entities: # Send in batches to avoid oversized messages
for i in range(0, len(entities), self.batch_size):
batch = entities[i:i + self.batch_size]
r = GraphEmbeddings( r = GraphEmbeddings(
metadata=v.metadata, metadata=v.metadata,
entities=entities, entities=batch,
) )
await flow("output").send(r) await flow("output").send(r)
except Exception as e: except Exception as e:
@ -92,6 +95,13 @@ class Processor(FlowProcessor):
@staticmethod @staticmethod
def add_args(parser): def add_args(parser):
parser.add_argument(
'--batch-size',
type=int,
default=default_batch_size,
help=f'Maximum entities per output message (default: {default_batch_size})'
)
FlowProcessor.add_args(parser) FlowProcessor.add_args(parser)
def run(): def run():

View file

@ -26,6 +26,8 @@ SUBJECT_OF_VALUE = Term(type=IRI, iri=SUBJECT_OF)
default_ident = "kg-extract-definitions" default_ident = "kg-extract-definitions"
default_concurrency = 1 default_concurrency = 1
default_triples_batch_size = 50
default_entity_batch_size = 5
class Processor(FlowProcessor): class Processor(FlowProcessor):
@ -33,6 +35,8 @@ class Processor(FlowProcessor):
id = params.get("id") id = params.get("id")
concurrency = params.get("concurrency", 1) concurrency = params.get("concurrency", 1)
self.triples_batch_size = params.get("triples_batch_size", default_triples_batch_size)
self.entity_batch_size = params.get("entity_batch_size", default_entity_batch_size)
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
@ -173,7 +177,9 @@ class Processor(FlowProcessor):
context=defn["definition"], context=defn["definition"],
)) ))
if triples: # Send triples in batches
for i in range(0, len(triples), self.triples_batch_size):
batch = triples[i:i + self.triples_batch_size]
await self.emit_triples( await self.emit_triples(
flow("triples"), flow("triples"),
Metadata( Metadata(
@ -182,10 +188,12 @@ class Processor(FlowProcessor):
user=v.metadata.user, user=v.metadata.user,
collection=v.metadata.collection, collection=v.metadata.collection,
), ),
triples batch
) )
if entities: # Send entity contexts in batches
for i in range(0, len(entities), self.entity_batch_size):
batch = entities[i:i + self.entity_batch_size]
await self.emit_ecs( await self.emit_ecs(
flow("entity-contexts"), flow("entity-contexts"),
Metadata( Metadata(
@ -194,7 +202,7 @@ class Processor(FlowProcessor):
user=v.metadata.user, user=v.metadata.user,
collection=v.metadata.collection, collection=v.metadata.collection,
), ),
entities batch
) )
except Exception as e: except Exception as e:
@ -212,6 +220,20 @@ class Processor(FlowProcessor):
help=f'Concurrent processing threads (default: {default_concurrency})' help=f'Concurrent processing threads (default: {default_concurrency})'
) )
parser.add_argument(
'--triples-batch-size',
type=int,
default=default_triples_batch_size,
help=f'Maximum triples per output message (default: {default_triples_batch_size})'
)
parser.add_argument(
'--entity-batch-size',
type=int,
default=default_entity_batch_size,
help=f'Maximum entity contexts per output message (default: {default_entity_batch_size})'
)
FlowProcessor.add_args(parser) FlowProcessor.add_args(parser)
def run(): def run():

View file

@ -27,6 +27,8 @@ logger = logging.getLogger(__name__)
default_ident = "kg-extract-ontology" default_ident = "kg-extract-ontology"
default_concurrency = 1 default_concurrency = 1
default_triples_batch_size = 50
default_entity_batch_size = 5
# URI prefix mappings for common namespaces # URI prefix mappings for common namespaces
URI_PREFIXES = { URI_PREFIXES = {
@ -53,6 +55,8 @@ class Processor(FlowProcessor):
def __init__(self, **params): def __init__(self, **params):
id = params.get("id", default_ident) id = params.get("id", default_ident)
concurrency = params.get("concurrency", default_concurrency) concurrency = params.get("concurrency", default_concurrency)
self.triples_batch_size = params.get("triples_batch_size", default_triples_batch_size)
self.entity_batch_size = params.get("entity_batch_size", default_entity_batch_size)
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
@ -315,20 +319,22 @@ class Processor(FlowProcessor):
# Build entity contexts from all triples (including ontology elements) # Build entity contexts from all triples (including ontology elements)
entity_contexts = self.build_entity_contexts(all_triples) entity_contexts = self.build_entity_contexts(all_triples)
# Emit all triples (extracted + ontology definitions) # Emit triples in batches
if all_triples: for i in range(0, len(all_triples), self.triples_batch_size):
batch = all_triples[i:i + self.triples_batch_size]
await self.emit_triples( await self.emit_triples(
flow("triples"), flow("triples"),
v.metadata, v.metadata,
all_triples batch
) )
# Emit entity contexts # Emit entity contexts in batches
if entity_contexts: for i in range(0, len(entity_contexts), self.entity_batch_size):
batch = entity_contexts[i:i + self.entity_batch_size]
await self.emit_entity_contexts( await self.emit_entity_contexts(
flow("entity-contexts"), flow("entity-contexts"),
v.metadata, v.metadata,
entity_contexts batch
) )
logger.info(f"Extracted {len(triples)} content triples + {len(ontology_triples)} ontology triples " logger.info(f"Extracted {len(triples)} content triples + {len(ontology_triples)} ontology triples "
@ -864,6 +870,18 @@ class Processor(FlowProcessor):
default=0.3, default=0.3,
help='Similarity threshold for ontology matching (default: 0.3, range: 0.0-1.0)' help='Similarity threshold for ontology matching (default: 0.3, range: 0.0-1.0)'
) )
parser.add_argument(
'--triples-batch-size',
type=int,
default=default_triples_batch_size,
help=f'Maximum triples per output message (default: {default_triples_batch_size})'
)
parser.add_argument(
'--entity-batch-size',
type=int,
default=default_entity_batch_size,
help=f'Maximum entity contexts per output message (default: {default_entity_batch_size})'
)
FlowProcessor.add_args(parser) FlowProcessor.add_args(parser)

View file

@ -25,6 +25,7 @@ SUBJECT_OF_VALUE = Term(type=IRI, iri=SUBJECT_OF)
default_ident = "kg-extract-relationships" default_ident = "kg-extract-relationships"
default_concurrency = 1 default_concurrency = 1
default_triples_batch_size = 50
class Processor(FlowProcessor): class Processor(FlowProcessor):
@ -32,6 +33,7 @@ class Processor(FlowProcessor):
id = params.get("id") id = params.get("id")
concurrency = params.get("concurrency", 1) concurrency = params.get("concurrency", 1)
self.triples_batch_size = params.get("triples_batch_size", default_triples_batch_size)
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
@ -181,7 +183,9 @@ class Processor(FlowProcessor):
o=Term(type=IRI, iri=v.metadata.id) o=Term(type=IRI, iri=v.metadata.id)
)) ))
if triples: # Send triples in batches
for i in range(0, len(triples), self.triples_batch_size):
batch = triples[i:i + self.triples_batch_size]
await self.emit_triples( await self.emit_triples(
flow("triples"), flow("triples"),
Metadata( Metadata(
@ -190,7 +194,7 @@ class Processor(FlowProcessor):
user=v.metadata.user, user=v.metadata.user,
collection=v.metadata.collection, collection=v.metadata.collection,
), ),
triples batch
) )
except Exception as e: except Exception as e:
@ -208,6 +212,13 @@ class Processor(FlowProcessor):
help=f'Concurrent processing threads (default: {default_concurrency})' help=f'Concurrent processing threads (default: {default_concurrency})'
) )
parser.add_argument(
'--triples-batch-size',
type=int,
default=default_triples_batch_size,
help=f'Maximum triples per output message (default: {default_triples_batch_size})'
)
FlowProcessor.add_args(parser) FlowProcessor.add_args(parser)
def run(): def run():