From 963944f4aa5479766b7bf4f6b3bc816c105bcb10 Mon Sep 17 00:00:00 2001 From: Cyber MacGeddon Date: Mon, 5 Aug 2024 22:10:18 +0100 Subject: [PATCH] Have configurable input/output queues on call from kg-relationships --- .../kg/extract_relationships/extract.py | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/trustgraph/kg/extract_relationships/extract.py b/trustgraph/kg/extract_relationships/extract.py index b3e34549..e2c63c4f 100755 --- a/trustgraph/kg/extract_relationships/extract.py +++ b/trustgraph/kg/extract_relationships/extract.py @@ -11,7 +11,10 @@ import os from pulsar.schema import JsonSchema from ... schema import ChunkEmbeddings, Triple, GraphEmbeddings, Source, Value -from ... schema import chunk_embeddings_ingest_queue, triples_store_queue, graph_embeddings_store_queue +from ... schema import chunk_embeddings_ingest_queue, triples_store_queue +from ... schema import graph_embeddings_store_queue +from ... schema import text_completion_request_queue +from ... schema import text_completion_response_queue from ... log_level import LogLevel from ... llm_client import LlmClient from ... prompts import to_relationships @@ -35,6 +38,12 @@ class Processor(ConsumerProducer): output_queue = params.get("output_queue", default_output_queue) vector_queue = params.get("vector_queue", default_vector_queue) subscriber = params.get("subscriber", default_subscriber) + tc_request_queue = params.get( + "text_completion_request_queue", text_completion_request_queue + ) + tc_response_queue = params.get( + "text_completion_response_queue", text_completion_response_queue + ) super(Processor, self).__init__( **params | { @@ -43,6 +52,8 @@ class Processor(ConsumerProducer): "subscriber": subscriber, "input_schema": ChunkEmbeddings, "output_schema": Triple, + "text_completion_request_queue": tc_request_queue, + "text_completion_response_queue": tc_response_queue, } ) @@ -55,6 +66,8 @@ class Processor(ConsumerProducer): "input_queue": input_queue, "output_queue": output_queue, "vector_queue": vector_queue, + "text_completion_request_queue": tc_request_queue, + "text_completion_response_queue": tc_response_queue, "subscriber": subscriber, "input_schema": ChunkEmbeddings.__name__, "output_schema": Triple.__name__, @@ -63,6 +76,8 @@ class Processor(ConsumerProducer): self.llm = LlmClient( pulsar_host = self.pulsar_host, + input_queue=tc_request_queue, + output_queue=tc_response_queue, subscriber = module + "-llm", ) @@ -179,6 +194,18 @@ class Processor(ConsumerProducer): help=f'Vector output queue (default: {default_vector_queue})' ) + parser.add_argument( + '--text-completion-request-queue', + default=text_completion_request_queue, + help=f'Text completion request queue (default: {text_completion_request_queue})', + ) + + parser.add_argument( + '--text-completion-response-queue', + default=text_completion_response_queue, + help=f'Text completion response queue (default: {text_completion_response_queue})', + ) + def run(): Processor.start(module, __doc__)