From df5e7c42b9bb5cd4f08d83a1e9a9478a4f8d7f47 Mon Sep 17 00:00:00 2001 From: Cyber MacGeddon Date: Mon, 5 Aug 2024 21:48:52 +0100 Subject: [PATCH] Make embeddings-vectorize have arguments to configure the embeddings queue names --- trustgraph/embeddings/vectorize/vectorize.py | 23 ++++++++++++++++++++ trustgraph/embeddings_client.py | 15 ++++++++++--- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/trustgraph/embeddings/vectorize/vectorize.py b/trustgraph/embeddings/vectorize/vectorize.py index bac0e932..1500b254 100755 --- a/trustgraph/embeddings/vectorize/vectorize.py +++ b/trustgraph/embeddings/vectorize/vectorize.py @@ -6,6 +6,7 @@ Input is text chunk, output is chunk and vectors. from ... schema import Chunk, ChunkEmbeddings from ... schema import chunk_ingest_queue, chunk_embeddings_ingest_queue +from ... schema import embeddings_request_queue, embeddings_response_queue from ... embeddings_client import EmbeddingsClient from ... log_level import LogLevel from ... base import ConsumerProducer @@ -23,11 +24,19 @@ class Processor(ConsumerProducer): input_queue = params.get("input_queue", default_input_queue) output_queue = params.get("output_queue", default_output_queue) subscriber = params.get("subscriber", default_subscriber) + emb_request_queue = params.get( + "embeddings_request_queue", embeddings_request_queue + ) + emb_response_queue = params.get( + "embeddings_response_queue", embeddings_response_queue + ) super(Processor, self).__init__( **params | { "input_queue": input_queue, "output_queue": output_queue, + "embeddings_request_queue": emb_request_queue, + "embeddings_response_queue": emb_response_queue, "subscriber": subscriber, "input_schema": Chunk, "output_schema": ChunkEmbeddings, @@ -36,6 +45,8 @@ class Processor(ConsumerProducer): self.embeddings = EmbeddingsClient( pulsar_host=self.pulsar_host, + input_queue=emb_request_queue, + output_queue=emb_response_queue, subscriber=module + "-emb", ) @@ -74,6 +85,18 @@ class Processor(ConsumerProducer): default_output_queue, ) + parser.add_argument( + '--embeddings-request-queue', + default=embeddings_request_queue, + help=f'Embeddings request queue (default: {embeddings_request_queue})', + ) + + parser.add_argument( + '--embeddings-response-queue', + default=embeddings_response_queue, + help=f'Embeddings request queue (default: {embeddings_response_queue})', + ) + def run(): Processor.start(module, __doc__) diff --git a/trustgraph/embeddings_client.py b/trustgraph/embeddings_client.py index 9b7fa81a..6daa2ff5 100644 --- a/trustgraph/embeddings_client.py +++ b/trustgraph/embeddings_client.py @@ -17,12 +17,21 @@ DEBUG=_pulsar.LoggerLevel.Debug class EmbeddingsClient: def __init__( - self, log_level=ERROR, subscriber=None, + self, log_level=ERROR, + input_queue=None, + output_queue=None, + subscriber=None, pulsar_host="pulsar://pulsar:6650", ): self.client = None + if input_queue == None: + input_queue=embeddings_request_queue + + if output_queue == None: + output_queue=embeddings_response_queue + if subscriber == None: subscriber = str(uuid.uuid4()) @@ -32,13 +41,13 @@ class EmbeddingsClient: ) self.producer = self.client.create_producer( - topic=embeddings_request_queue, + topic=input_queue, schema=JsonSchema(EmbeddingsRequest), chunking_enabled=True, ) self.consumer = self.client.subscribe( - embeddings_response_queue, subscriber, + output_queue, subscriber, schema=JsonSchema(EmbeddingsResponse), )