Have configurable input/output queues on call from kg-relationships

This commit is contained in:
Cyber MacGeddon 2024-08-05 22:10:18 +01:00
parent 8386712574
commit 963944f4aa

View file

@ -11,7 +11,10 @@ import os
from pulsar.schema import JsonSchema from pulsar.schema import JsonSchema
from ... schema import ChunkEmbeddings, Triple, GraphEmbeddings, Source, Value from ... schema import ChunkEmbeddings, Triple, GraphEmbeddings, Source, Value
from ... schema import chunk_embeddings_ingest_queue, triples_store_queue, graph_embeddings_store_queue from ... schema import chunk_embeddings_ingest_queue, triples_store_queue
from ... schema import graph_embeddings_store_queue
from ... schema import text_completion_request_queue
from ... schema import text_completion_response_queue
from ... log_level import LogLevel from ... log_level import LogLevel
from ... llm_client import LlmClient from ... llm_client import LlmClient
from ... prompts import to_relationships from ... prompts import to_relationships
@ -35,6 +38,12 @@ class Processor(ConsumerProducer):
output_queue = params.get("output_queue", default_output_queue) output_queue = params.get("output_queue", default_output_queue)
vector_queue = params.get("vector_queue", default_vector_queue) vector_queue = params.get("vector_queue", default_vector_queue)
subscriber = params.get("subscriber", default_subscriber) subscriber = params.get("subscriber", default_subscriber)
tc_request_queue = params.get(
"text_completion_request_queue", text_completion_request_queue
)
tc_response_queue = params.get(
"text_completion_response_queue", text_completion_response_queue
)
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
@ -43,6 +52,8 @@ class Processor(ConsumerProducer):
"subscriber": subscriber, "subscriber": subscriber,
"input_schema": ChunkEmbeddings, "input_schema": ChunkEmbeddings,
"output_schema": Triple, "output_schema": Triple,
"text_completion_request_queue": tc_request_queue,
"text_completion_response_queue": tc_response_queue,
} }
) )
@ -55,6 +66,8 @@ class Processor(ConsumerProducer):
"input_queue": input_queue, "input_queue": input_queue,
"output_queue": output_queue, "output_queue": output_queue,
"vector_queue": vector_queue, "vector_queue": vector_queue,
"text_completion_request_queue": tc_request_queue,
"text_completion_response_queue": tc_response_queue,
"subscriber": subscriber, "subscriber": subscriber,
"input_schema": ChunkEmbeddings.__name__, "input_schema": ChunkEmbeddings.__name__,
"output_schema": Triple.__name__, "output_schema": Triple.__name__,
@ -63,6 +76,8 @@ class Processor(ConsumerProducer):
self.llm = LlmClient( self.llm = LlmClient(
pulsar_host = self.pulsar_host, pulsar_host = self.pulsar_host,
input_queue=tc_request_queue,
output_queue=tc_response_queue,
subscriber = module + "-llm", subscriber = module + "-llm",
) )
@ -179,6 +194,18 @@ class Processor(ConsumerProducer):
help=f'Vector output queue (default: {default_vector_queue})' help=f'Vector output queue (default: {default_vector_queue})'
) )
parser.add_argument(
'--text-completion-request-queue',
default=text_completion_request_queue,
help=f'Text completion request queue (default: {text_completion_request_queue})',
)
parser.add_argument(
'--text-completion-response-queue',
default=text_completion_response_queue,
help=f'Text completion response queue (default: {text_completion_response_queue})',
)
def run(): def run():
Processor.start(module, __doc__) Processor.start(module, __doc__)