diff --git a/trustgraph/chunker/recursive/chunker.py b/trustgraph/chunker/recursive/chunker.py index 16e3c992..b26ed70b 100755 --- a/trustgraph/chunker/recursive/chunker.py +++ b/trustgraph/chunker/recursive/chunker.py @@ -4,28 +4,19 @@ Simple decoder, accepts text documents on input, outputs chunks from the as text as separate output objects. """ -import pulsar -from pulsar.schema import JsonSchema -import tempfile -import base64 -import os -import argparse -from langchain_text_splitters import RecursiveCharacterTextSplitter -import time - from ... schema import TextDocument, Chunk, Source from ... log_level import LogLevel +from ... base import ConsumerProducer -default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') default_input_queue = 'text-doc-load' default_output_queue = 'chunk-load' default_subscriber = 'chunker-recursive' -class Processor: +class Processor(ConsumerProducer): def __init__( self, - pulsar_host=default_pulsar_host, + pulsar_host=None, input_queue=default_input_queue, output_queue=default_output_queue, subscriber=default_subscriber, @@ -34,21 +25,14 @@ class Processor: chunk_overlap=100, ): - self.client = None - - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) - - self.consumer = self.client.subscribe( - input_queue, subscriber, - schema=JsonSchema(TextDocument), - ) - - self.producer = self.client.create_producer( - topic=output_queue, - schema=JsonSchema(Chunk), + super(Processor, self).__init__( + pulsar_host=pulsar_host, + log_level=log_level, + input_queue=input_queue, + output_queue=output_queue, + subscriber=subscriber, + input_schema=TextDocument, + output_schema=Chunk, ) self.text_splitter = RecursiveCharacterTextSplitter( @@ -58,134 +42,55 @@ class Processor: is_separator_regex=False, ) - print("Chunker inited") + def handle(self, msg): - def run(self): + v = msg.value() + print(f"Chunking {v.source.id}...", flush=True) - print("Chunker running") + texts = self.text_splitter.create_documents( + [v.text.decode("utf-8")] + ) - while True: + for ix, chunk in enumerate(texts): - msg = self.consumer.receive() - print("Chunker message received") + id = v.source.id + "-c" + str(ix) - try: + r = Chunk( + source=Source( + source=v.source.source, + id=id, + title=v.source.title + ), + chunk=chunk.page_content.encode("utf-8"), + ) - v = msg.value() - print(f"Chunking {v.source.id}...", flush=True) + self.send(r) - texts = self.text_splitter.create_documents( - [v.text.decode("utf-8")] - ) + print("Done.", flush=True) - for ix, chunk in enumerate(texts): + @staticmethod + def add_args(parser): - id = v.source.id + "-c" + str(ix) + ConsumerProducer.add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ) - r = Chunk( - source=Source( - source=v.source.source, - id=id, - title=v.source.title - ), - chunk=chunk.page_content.encode("utf-8"), - ) + parser.add_argument( + '-z', '--chunk-size', + type=int, + default=2000, + help=f'Chunk size (default: 2000)' + ) - self.producer.send(r) - - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) - - print("Done.", flush=True) - - except Exception as e: - print(e, flush=True) - - # Message failed to be processed - self.consumer.negative_acknowledge(msg) - - def __del__(self): - - if self.client: - self.client.close() + parser.add_argument( + '-v', '--chunk-overlap', + type=int, + default=100, + help=f'Chunk overlap (default: 100)' + ) def run(): - parser = argparse.ArgumentParser( - prog='pdf-decoder', - description=__doc__, - ) - - parser.add_argument( - '-p', '--pulsar-host', - default=default_pulsar_host, - help=f'Pulsar host (default: {default_pulsar_host})', - ) - - parser.add_argument( - '-i', '--input-queue', - default=default_input_queue, - help=f'Input queue (default: {default_input_queue})' - ) - - parser.add_argument( - '-s', '--subscriber', - default=default_subscriber, - help=f'Queue subscriber name (default: {default_subscriber})' - ) - - parser.add_argument( - '-o', '--output-queue', - default=default_output_queue, - help=f'Output queue (default: {default_output_queue})' - ) - - parser.add_argument( - '-l', '--log-level', - type=LogLevel, - default=LogLevel.INFO, - choices=list(LogLevel), - help=f'Output queue (default: info)' - ) - - parser.add_argument( - '-z', '--chunk-size', - type=int, - default=2000, - help=f'Chunk size (default: 2000)' - ) - - parser.add_argument( - '-v', '--chunk-overlap', - type=int, - default=100, - help=f'Chunk overlap (default: 100)' - ) - - args = parser.parse_args() - - - while True: - - try: - - p = Processor( - pulsar_host=args.pulsar_host, - input_queue=args.input_queue, - output_queue=args.output_queue, - subscriber=args.subscriber, - log_level=args.log_level, - chunk_size=args.chunk_size, - chunk_overlap=args.chunk_overlap, - ) - - p.run() - - except Exception as e: - - print("Exception:", e, flush=True) - print("Will retry...", flush=True) - - time.sleep(10) - + Processor.start('chunker', __doc__) diff --git a/trustgraph/decoder/pdf/pdf_decoder.py b/trustgraph/decoder/pdf/pdf_decoder.py index f2ff56a2..dd241589 100755 --- a/trustgraph/decoder/pdf/pdf_decoder.py +++ b/trustgraph/decoder/pdf/pdf_decoder.py @@ -4,171 +4,82 @@ Simple decoder, accepts PDF documents on input, outputs pages from the PDF document as text as separate output objects. """ -import pulsar -from pulsar.schema import JsonSchema from langchain_community.document_loaders import PyPDFLoader -import tempfile -import base64 -import os -import argparse -import time from ... schema import Document, TextDocument, Source from ... log_level import LogLevel +from ... base import ConsumerProducer -default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') default_input_queue = 'document-load' default_output_queue = 'text-doc-load' default_subscriber = 'pdf-decoder' -class Processor: +class Processor(ConsumerProducer): def __init__( self, - pulsar_host=default_pulsar_host, + pulsar_host=None, input_queue=default_input_queue, output_queue=default_output_queue, subscriber=default_subscriber, log_level=LogLevel.INFO, ): - self.client = None - - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) - - self.consumer = self.client.subscribe( - input_queue, subscriber, - schema=JsonSchema(Document), - ) - - self.producer = self.client.create_producer( - topic=output_queue, - schema=JsonSchema(TextDocument), + super(Processor, self).__init__( + pulsar_host=pulsar_host, + log_level=log_level, + input_queue=input_queue, + output_queue=output_queue, + subscriber=subscriber, + input_schema=Document, + output_schema=TextDocument, ) print("PDF inited") - print("Pulsar", pulsar_host) - print("Input", input_queue) - print("Output", output_queue) - print("Subscriber", subscriber) + def handle(self, msg): - def run(self): + print("PDF message received") - print("PDF running") + v = msg.value() - while True: + print(f"Decoding {v.source.id}...", flush=True) - msg = self.consumer.receive() + with tempfile.NamedTemporaryFile(delete_on_close=False) as fp: - print("PDF message received") + fp.write(base64.b64decode(v.data)) + fp.close() - try: + with open(fp.name, mode='rb') as f: - v = msg.value() - print(f"Decoding {v.source.id}...", flush=True) + loader = PyPDFLoader(fp.name) + pages = loader.load() - with tempfile.NamedTemporaryFile(delete_on_close=False) as fp: + for ix, page in enumerate(pages): - fp.write(base64.b64decode(v.data)) - fp.close() + id = v.source.id + "-p" + str(ix) + r = TextDocument( + source=Source( + source=v.source.source, + title=v.source.title, + id=id, + ), + text=page.page_content.encode("utf-8"), + ) - with open(fp.name, mode='rb') as f: + self.send(r) - loader = PyPDFLoader(fp.name) - pages = loader.load() + print("Done.", flush=True) - for ix, page in enumerate(pages): + @staticmethod + def add_args(parser): - id = v.source.id + "-p" + str(ix) - r = TextDocument( - source=Source( - source=v.source.source, - title=v.source.title, - id=id, - ), - text=page.page_content.encode("utf-8"), - ) - - self.producer.send(r) - - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) - - print("Done.", flush=True) - - except Exception as e: - print(e, flush=True) - - # Message failed to be processed - self.consumer.negative_acknowledge(msg) - - def __del__(self): - - if self.client: - self.client.close() + ConsumerProducer.add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ) def run(): - parser = argparse.ArgumentParser( - prog='pdf-decoder', - description=__doc__, - ) - - parser.add_argument( - '-p', '--pulsar-host', - default=default_pulsar_host, - help=f'Pulsar host (default: {default_pulsar_host})', - ) - - parser.add_argument( - '-i', '--input-queue', - default=default_input_queue, - help=f'Input queue (default: {default_input_queue})' - ) - - parser.add_argument( - '-s', '--subscriber', - default=default_subscriber, - help=f'Queue subscriber name (default: {default_subscriber})' - ) - - parser.add_argument( - '-o', '--output-queue', - default=default_output_queue, - help=f'Output queue (default: {default_output_queue})' - ) - - parser.add_argument( - '-l', '--log-level', - type=LogLevel, - default=LogLevel.INFO, - choices=list(LogLevel), - help=f'Output queue (default: info)' - ) - - args = parser.parse_args() - - while True: - - try: - p = Processor( - pulsar_host=args.pulsar_host, - input_queue=args.input_queue, - output_queue=args.output_queue, - subscriber=args.subscriber, - log_level=args.log_level, - ) - - p.run() - - except Exception as e: - - print("Exception:", e, flush=True) - print("Will retry...", flush=True) - - time.sleep(10) + Processor.start("pdf-decoder", __doc__) diff --git a/trustgraph/embeddings/vectorize/vectorize.py b/trustgraph/embeddings/vectorize/vectorize.py index deeb280e..e779c6ed 100755 --- a/trustgraph/embeddings/vectorize/vectorize.py +++ b/trustgraph/embeddings/vectorize/vectorize.py @@ -4,49 +4,34 @@ Vectorizer, calls the embeddings service to get embeddings for a chunk. Input is text chunk, output is chunk and vectors. """ -import pulsar -from pulsar.schema import JsonSchema -import tempfile -import base64 -import os -import argparse -import time - from ... schema import Chunk, VectorsChunk from ... embeddings_client import EmbeddingsClient from ... log_level import LogLevel +from ... base import ConsumerProducer -default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') default_input_queue = 'chunk-load' default_output_queue = 'vectors-chunk-load' default_subscriber = 'embeddings-vectorizer' -class Processor: +class Processor(ConsumerProducer): def __init__( self, - pulsar_host=default_pulsar_host, + pulsar_host=None, input_queue=default_input_queue, output_queue=default_output_queue, subscriber=default_subscriber, log_level=LogLevel.INFO, ): - self.client = None - - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) - - self.consumer = self.client.subscribe( - input_queue, subscriber, - schema=JsonSchema(Chunk), - ) - - self.producer = self.client.create_producer( - topic=output_queue, - schema=JsonSchema(VectorsChunk), + super(Processor, self).__init__( + pulsar_host=pulsar_host, + log_level=log_level, + input_queue=input_queue, + output_queue=output_queue, + subscriber=subscriber, + input_schema=Chunk, + output_schema=VectorsChunk, ) self.embeddings = EmbeddingsClient(pulsar_host=pulsar_host) @@ -56,108 +41,37 @@ class Processor: r = VectorsChunk(source=source, chunk=chunk, vectors=vectors) self.producer.send(r) - def run(self): + def handle(self, msg): - while True: + v = msg.value() + print(f"Indexing {v.source.id}...", flush=True) - msg = self.consumer.receive() - - try: - - v = msg.value() - print(f"Indexing {v.source.id}...", flush=True) - - chunk = v.chunk.decode("utf-8") - - try: - - vectors = self.embeddings.request(chunk) - - self.emit( - source=v.source, - chunk=chunk.encode("utf-8"), - vectors=vectors - ) - - except Exception as e: - print("Exception:", e, flush=True) - - print("Done.", flush=True) - - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) - - except Exception as e: - - print("Exception:", e, flush=True) - - # Message failed to be processed - self.consumer.negative_acknowledge(msg) - - def __del__(self): - - if self.client: - self.client.close() - -def run(): - - parser = argparse.ArgumentParser( - prog='embeddings-vectorizer', - description=__doc__, - ) - - parser.add_argument( - '-p', '--pulsar-host', - default=default_pulsar_host, - help=f'Pulsar host (default: {default_pulsar_host})', - ) - - parser.add_argument( - '-i', '--input-queue', - default=default_input_queue, - help=f'Input queue (default: {default_input_queue})' - ) - - parser.add_argument( - '-s', '--subscriber', - default=default_subscriber, - help=f'Queue subscriber name (default: {default_subscriber})' - ) - - parser.add_argument( - '-o', '--output-queue', - default=default_output_queue, - help=f'Output queue (default: {default_output_queue})' - ) - - parser.add_argument( - '-l', '--log-level', - type=LogLevel, - default=LogLevel.INFO, - choices=list(LogLevel), - help=f'Output queue (default: info)' - ) - - args = parser.parse_args() - - while True: + chunk = v.chunk.decode("utf-8") try: - p = Processor( - pulsar_host=args.pulsar_host, - input_queue=args.input_queue, - output_queue=args.output_queue, - subscriber=args.subscriber, - log_level=args.log_level, + vectors = self.embeddings.request(chunk) + + self.emit( + source=v.source, + chunk=chunk.encode("utf-8"), + vectors=vectors ) - p.run() - except Exception as e: - print("Exception:", e, flush=True) - print("Will retry...", flush=True) - time.sleep(10) + print("Done.", flush=True) + + @staticmethod + def add_args(parser): + + ConsumerProducer.add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ) + +def run(): + + Processor.start("embeddings-vectorize", __doc__) diff --git a/trustgraph/rag/graph/rag.py b/trustgraph/rag/graph/rag.py index 0ac7b12e..4a9b820e 100755 --- a/trustgraph/rag/graph/rag.py +++ b/trustgraph/rag/graph/rag.py @@ -4,30 +4,22 @@ Simple RAG service, performs query using graph RAG an LLM. Input is query, output is response. """ -import pulsar -from pulsar.schema import JsonSchema -import tempfile -import base64 -import os -import argparse -import time - from ... schema import GraphRagQuery, GraphRagResponse from ... log_level import LogLevel from ... graph_rag import GraphRag +from ... base import ConsumerProducer -default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') default_input_queue = 'graph-rag-query' default_output_queue = 'graph-rag-response' default_subscriber = 'graph-rag' default_graph_hosts = [ 'localhost' ] default_vector_store = 'http://localhost:19530' -class Processor: +class Processor(ConsumerProducer): def __init__( self, - pulsar_host=default_pulsar_host, + pulsar_host=None, input_queue=default_input_queue, output_queue=default_output_queue, subscriber=default_subscriber, @@ -39,21 +31,14 @@ class Processor: max_sg_size=3000, ): - self.client = None - - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) - - self.consumer = self.client.subscribe( - input_queue, subscriber, - schema=JsonSchema(GraphRagQuery), - ) - - self.producer = self.client.create_producer( - topic=output_queue, - schema=JsonSchema(GraphRagResponse), + super(Processor, self).__init__( + pulsar_host=pulsar_host, + log_level=log_level, + input_queue=input_queue, + output_queue=output_queue, + subscriber=subscriber, + input_schema=TextCompletionRequest, + output_schema=TextCompletionResponse, ) self.rag = GraphRag( @@ -66,142 +51,67 @@ class Processor: max_sg_size=max_sg_size, ) - def run(self): + def handle(self, msg): - while True: + v = msg.value() - msg = self.consumer.receive() + # Sender-produced ID - try: + id = msg.properties()["id"] - v = msg.value() + print(f"Handling input {id}...", flush=True) - # Sender-produced ID + response = self.rag.query(v.query) - id = msg.properties()["id"] + print("Send response...", flush=True) + r = GraphRagResponse(response = response) + self.producer.send(r, properties={"id": id}) - print(f"Handling input {id}...", flush=True) + print("Done.", flush=True) - response = self.rag.query(v.query) + @staticmethod + def add_args(parser): - print("Send response...", flush=True) - r = GraphRagResponse(response = response) - self.producer.send(r, properties={"id": id}) + ConsumerProducer.add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ) - print("Done.", flush=True) + parser.add_argument( + '-g', '--graph-hosts', + default='cassandra', + help=f'Graph hosts, comma separated (default: cassandra)' + ) - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) + parser.add_argument( + '-v', '--vector-store', + default='http://milvus:19530', + help=f'Vector host (default: http://milvus:19530)' + ) - except Exception as e: + parser.add_argument( + '-e', '--entity-limit', + type=int, + default=50, + help=f'Entity vector fetch limit (default: 50)' + ) - print("Exception:", e, flush=True) + parser.add_argument( + '-t', '--triple-limit', + type=int, + default=30, + help=f'Triple query limit, per query (default: 30)' + ) - # Message failed to be processed - self.consumer.negative_acknowledge(msg) - - def __del__(self): - - if self.client: - self.client.close() + parser.add_argument( + '-u', '--max-subgraph-size', + type=int, + default=3000, + help=f'Max subgraph size (default: 3000)' + ) def run(): - parser = argparse.ArgumentParser( - prog='graph-rag', - description=__doc__, - ) + Processor.start('graph-rag', __doc__) - parser.add_argument( - '-p', '--pulsar-host', - default=default_pulsar_host, - help=f'Pulsar host (default: {default_pulsar_host})', - ) - - parser.add_argument( - '-i', '--input-queue', - default=default_input_queue, - help=f'Input queue (default: {default_input_queue})' - ) - - parser.add_argument( - '-s', '--subscriber', - default=default_subscriber, - help=f'Queue subscriber name (default: {default_subscriber})' - ) - - parser.add_argument( - '-o', '--output-queue', - default=default_output_queue, - help=f'Output queue (default: {default_output_queue})' - ) - - parser.add_argument( - '-l', '--log-level', - type=LogLevel, - default=LogLevel.INFO, - choices=list(LogLevel), - help=f'Output queue (default: info)' - ) - - parser.add_argument( - '-g', '--graph-hosts', - default='cassandra', - help=f'Graph hosts, comma separated (default: cassandra)' - ) - - parser.add_argument( - '-v', '--vector-store', - default='http://milvus:19530', - help=f'Vector host (default: http://milvus:19530)' - ) - - parser.add_argument( - '-e', '--entity-limit', - type=int, - default=50, - help=f'Entity vector fetch limit (default: 50)' - ) - - parser.add_argument( - '-t', '--triple-limit', - type=int, - default=30, - help=f'Triple query limit, per query (default: 30)' - ) - - parser.add_argument( - '-u', '--max-subgraph-size', - type=int, - default=3000, - help=f'Max subgraph size (default: 3000)' - ) - - args = parser.parse_args() - while True: - - try: - - p = Processor( - pulsar_host=args.pulsar_host, - input_queue=args.input_queue, - output_queue=args.output_queue, - subscriber=args.subscriber, - log_level=args.log_level, - graph_hosts=args.graph_hosts.split(","), - vector_store=args.vector_store, - entity_limit=args.entity_limit, - triple_limit=args.triple_limit, - max_sg_size=args.max_subgraph_size, - ) - - p.run() - - except Exception as e: - - print("Exception:", e, flush=True) - print("Will retry...", flush=True) - - time.sleep(10) -