diff --git a/trustgraph-base/trustgraph/base/embeddings_service.py b/trustgraph-base/trustgraph/base/embeddings_service.py index c6befdb7..c0dd3978 100644 --- a/trustgraph-base/trustgraph/base/embeddings_service.py +++ b/trustgraph-base/trustgraph/base/embeddings_service.py @@ -11,20 +11,26 @@ from .. exceptions import TooManyRequests from .. base import FlowProcessor, ConsumerSpec, ProducerSpec default_ident = "embeddings" +default_concurrency = 1 class EmbeddingsService(FlowProcessor): def __init__(self, **params): id = params.get("id") + concurrency = params.get("concurrency", 1) - super(EmbeddingsService, self).__init__(**params | { "id": id }) + super(EmbeddingsService, self).__init__(**params | { + "id": id, + "concurrency": concurrency, + }) self.register_specification( ConsumerSpec( name = "request", schema = EmbeddingsRequest, - handler = self.on_request + handler = self.on_request, + concurrency = concurrency, ) ) @@ -84,6 +90,13 @@ class EmbeddingsService(FlowProcessor): @staticmethod def add_args(parser): + parser.add_argument( + '-c', '--concurrency', + type=int, + default=default_concurrency, + help=f'Concurrent processing threads (default: {default_concurrency})' + ) + FlowProcessor.add_args(parser) diff --git a/trustgraph-base/trustgraph/base/llm_service.py b/trustgraph-base/trustgraph/base/llm_service.py index 627bcbb4..fddbdf3e 100644 --- a/trustgraph-base/trustgraph/base/llm_service.py +++ b/trustgraph-base/trustgraph/base/llm_service.py @@ -128,7 +128,7 @@ class LlmService(FlowProcessor): '-c', '--concurrency', type=int, default=default_concurrency, - help=f'LLM max output tokens (default: {default_concurrency})' + help=f'Concurrent processing threads (default: {default_concurrency})' ) FlowProcessor.add_args(parser) diff --git a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py index f95dadf9..66571478 100755 --- a/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/definitions/extract.py @@ -21,16 +21,19 @@ RDF_LABEL_VALUE = Value(value=RDF_LABEL, is_uri=True) SUBJECT_OF_VALUE = Value(value=SUBJECT_OF, is_uri=True) default_ident = "kg-extract-definitions" +default_concurrency = 1 class Processor(FlowProcessor): def __init__(self, **params): id = params.get("id") + concurrency = params.get("concurrency", 1) super(Processor, self).__init__( **params | { "id": id, + "concurrency": concurrency, } ) @@ -38,7 +41,8 @@ class Processor(FlowProcessor): ConsumerSpec( name = "input", schema = Chunk, - handler = self.on_message + handler = self.on_message, + concurrency = concurrency, ) ) @@ -190,6 +194,13 @@ class Processor(FlowProcessor): @staticmethod def add_args(parser): + parser.add_argument( + '-c', '--concurrency', + type=int, + default=default_concurrency, + help=f'Concurrent processing threads (default: {default_concurrency})' + ) + FlowProcessor.add_args(parser) def run(): diff --git a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py index 63670a7d..dafee77d 100755 --- a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py @@ -20,16 +20,19 @@ RDF_LABEL_VALUE = Value(value=RDF_LABEL, is_uri=True) SUBJECT_OF_VALUE = Value(value=SUBJECT_OF, is_uri=True) default_ident = "kg-extract-relationships" +default_concurrency = 1 class Processor(FlowProcessor): def __init__(self, **params): id = params.get("id") + concurrency = params.get("concurrency", 1) super(Processor, self).__init__( **params | { "id": id, + "concurrency": concurrency, } ) @@ -37,7 +40,8 @@ class Processor(FlowProcessor): ConsumerSpec( name = "input", schema = Chunk, - handler = self.on_message + handler = self.on_message, + concurrency = concurrency, ) ) @@ -192,6 +196,13 @@ class Processor(FlowProcessor): @staticmethod def add_args(parser): + parser.add_argument( + '-c', '--concurrency', + type=int, + default=default_concurrency, + help=f'Concurrent processing threads (default: {default_concurrency})' + ) + FlowProcessor.add_args(parser) def run(): diff --git a/trustgraph-flow/trustgraph/model/prompt/template/service.py b/trustgraph-flow/trustgraph/model/prompt/template/service.py index 67590c1c..7bebf5f4 100755 --- a/trustgraph-flow/trustgraph/model/prompt/template/service.py +++ b/trustgraph-flow/trustgraph/model/prompt/template/service.py @@ -18,12 +18,14 @@ from .... base import ProducerSpec, ConsumerSpec, TextCompletionClientSpec from . prompt_manager import PromptConfiguration, Prompt, PromptManager default_ident = "prompt" +default_concurrency = 1 class Processor(FlowProcessor): def __init__(self, **params): id = params.get("id") + concurrency = params.get("concurrency", 1) # Config key for prompts self.config_key = params.get("config_type", "prompt") @@ -31,6 +33,7 @@ class Processor(FlowProcessor): super(Processor, self).__init__( **params | { "id": id, + "concurrency": concurrency, } ) @@ -38,7 +41,8 @@ class Processor(FlowProcessor): ConsumerSpec( name = "request", schema = PromptRequest, - handler = self.on_request + handler = self.on_request, + concurrency = concurrency, ) ) @@ -219,6 +223,13 @@ class Processor(FlowProcessor): @staticmethod def add_args(parser): + parser.add_argument( + '-c', '--concurrency', + type=int, + default=default_concurrency, + help=f'Concurrent processing threads (default: {default_concurrency})' + ) + FlowProcessor.add_args(parser) parser.add_argument( diff --git a/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py b/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py index 5d3cc2f4..328ae3f9 100755 --- a/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py +++ b/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py @@ -11,12 +11,14 @@ from ... base import PromptClientSpec, EmbeddingsClientSpec from ... base import GraphEmbeddingsClientSpec, TriplesClientSpec default_ident = "graph-rag" +default_concurrency = 1 class Processor(FlowProcessor): def __init__(self, **params): id = params.get("id", default_ident) + concurrency = params.get("concurrency", 1) entity_limit = params.get("entity_limit", 50) triple_limit = params.get("triple_limit", 30) @@ -26,6 +28,7 @@ class Processor(FlowProcessor): super(Processor, self).__init__( **params | { "id": id, + "concurrency": concurrency, "entity_limit": entity_limit, "triple_limit": triple_limit, "max_subgraph_size": max_subgraph_size, @@ -43,6 +46,7 @@ class Processor(FlowProcessor): name = "request", schema = GraphRagQuery, handler = self.on_request, + concurrency = concurrency, ) ) @@ -157,6 +161,13 @@ class Processor(FlowProcessor): @staticmethod def add_args(parser): + parser.add_argument( + '-c', '--concurrency', + type=int, + default=default_concurrency, + help=f'Concurrent processing threads (default: {default_concurrency})' + ) + FlowProcessor.add_args(parser) parser.add_argument(