diff --git a/trustgraph/base/processor.py b/trustgraph/base/processor.py index 90cbb132..e214b320 100644 --- a/trustgraph/base/processor.py +++ b/trustgraph/base/processor.py @@ -196,7 +196,6 @@ class ConsumerProducer(BaseProcessor): def send(self, msg, properties={}): - print(msg) self.producer.send(msg, properties) @staticmethod @@ -250,7 +249,6 @@ class Producer(BaseProcessor): def send(self, msg, properties={}): - print(msg) self.producer.send(msg, properties) @staticmethod diff --git a/trustgraph/graph/cassandra_write/write.py b/trustgraph/graph/cassandra_write/write.py index 01ca72f1..df317c2a 100755 --- a/trustgraph/graph/cassandra_write/write.py +++ b/trustgraph/graph/cassandra_write/write.py @@ -4,8 +4,6 @@ Graph writer. Input is graph edge. Writes edges to Cassandra graph. """ import pulsar -from pulsar.schema import JsonSchema -import tempfile import base64 import os import argparse @@ -14,135 +12,64 @@ import time from ... trustgraph import TrustGraph from ... schema import Triple from ... log_level import LogLevel +from ... base import Consumer -default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') default_input_queue = 'graph-load' default_subscriber = 'graph-write-cassandra' default_graph_host='localhost' -class Processor: +class Processor(Consumer): def __init__( self, - pulsar_host=default_pulsar_host, + pulsar_host=None, input_queue=default_input_queue, subscriber=default_subscriber, graph_host=default_graph_host, log_level=LogLevel.INFO, ): - self.client = None - - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) - - self.consumer = self.client.subscribe( - input_queue, subscriber, - schema=JsonSchema(Triple), + super(Processor, self).__init__( + pulsar_host=pulsar_host, + log_level=log_level, + input_queue=input_queue, + subscriber=subscriber, + input_schema=Triple, ) self.tg = TrustGraph([graph_host]) self.count = 0 - def run(self): + def handle(self, msg): - while True: + v = msg.value() - msg = self.consumer.receive() + self.tg.insert( + v.s.value, + v.p.value, + v.o.value + ) - try: + self.count += 1 - v = msg.value() + if (self.count % 1000) == 0: + print(self.count, "...", flush=True) - self.tg.insert( - v.s.value, - v.p.value, - v.o.value - ) + @staticmethod + def add_args(parser): - self.count += 1 + Consumer.add_args( + parser, default_input_queue, default_subscriber, + ) - if (self.count % 1000) == 0: - print(self.count, "...", flush=True) - - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) - - except Exception as e: - - print("Exception:", e, flush=True) - - # Message failed to be processed - self.consumer.negative_acknowledge(msg) - - def __del__(self): - - if self.client: - self.client.close() + parser.add_argument( + '-g', '--graph-host', + default="localhost", + help=f'Graph host (default: localhost)' + ) def run(): - parser = argparse.ArgumentParser( - prog='graph-write-cassandra', - description=__doc__, - ) - - parser.add_argument( - '-p', '--pulsar-host', - default=default_pulsar_host, - help=f'Pulsar host (default: {default_pulsar_host})', - ) - - parser.add_argument( - '-i', '--input-queue', - default=default_input_queue, - help=f'Input queue (default: {default_input_queue})' - ) - - parser.add_argument( - '-s', '--subscriber', - default=default_subscriber, - help=f'Queue subscriber name (default: {default_subscriber})' - ) - - parser.add_argument( - '-l', '--log-level', - type=LogLevel, - default=LogLevel.INFO, - choices=list(LogLevel), - help=f'Output queue (default: info)' - ) - - parser.add_argument( - '-g', '--graph-host', - default="localhost", - help=f'Output queue (default: localhost)' - ) - - args = parser.parse_args() - - while True: - - try: - - p = Processor( - pulsar_host=args.pulsar_host, - input_queue=args.input_queue, - subscriber=args.subscriber, - log_level=args.log_level, - graph_host=args.graph_host, - ) - - p.run() - - except Exception as e: - - print("Exception:", e, flush=True) - print("Will retry...", flush=True) - - time.sleep(10) - + Processor.start("graph-write-cassandra", __doc__) diff --git a/trustgraph/llm/ollama_text/llm.py b/trustgraph/llm/ollama_text/llm.py index b8669876..b522e225 100755 --- a/trustgraph/llm/ollama_text/llm.py +++ b/trustgraph/llm/ollama_text/llm.py @@ -4,14 +4,7 @@ Simple LLM service, performs text prompt completion using an Ollama service. Input is prompt, output is response. """ -import pulsar -from pulsar.schema import JsonSchema -import tempfile -import base64 -import os -import argparse from langchain_community.llms import Ollama -import time from ... schema import TextCompletionRequest, TextCompletionResponse from ... log_level import LogLevel @@ -42,8 +35,8 @@ class Processor(ConsumerProducer): input_queue=input_queue, output_queue=output_queue, subscriber=subscriber, - request_schema=TextCompletionRequest, - response_schema=TextCompletionResponse, + input_schema=TextCompletionRequest, + output_schema=TextCompletionResponse, ) self.llm = Ollama(base_url=ollama, model=model) diff --git a/trustgraph/vector/milvus_write/write.py b/trustgraph/vector/milvus_write/write.py index 579c576d..be40ae32 100755 --- a/trustgraph/vector/milvus_write/write.py +++ b/trustgraph/vector/milvus_write/write.py @@ -15,126 +15,55 @@ import time from ... schema import VectorsAssociation from ... log_level import LogLevel from ... triple_vectors import TripleVectors +from ... base import Consumer -default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') default_input_queue = 'vectors-load' default_subscriber = 'vector-write-milvus' default_store_uri = 'http://localhost:19530' -class Processor: +class Processor(Consumer): def __init__( self, - pulsar_host=default_pulsar_host, + pulsar_host=None, input_queue=default_input_queue, subscriber=default_subscriber, store_uri=default_store_uri, log_level=LogLevel.INFO, ): - self.client = None - - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) - - self.consumer = self.client.subscribe( - input_queue, subscriber, - schema=JsonSchema(VectorsAssociation), + super(Processor, self).__init__( + pulsar_host=pulsar_host, + log_level=log_level, + input_queue=input_queue, + subscriber=subscriber, + input_schema=VectorsAssociation, ) self.vecstore = TripleVectors(store_uri) - def run(self): + def handle(self, msg): - while True: + v = msg.value() - msg = self.consumer.receive() + if v.entity.value != "": + for vec in v.vectors: + self.vecstore.insert(vec, v.entity.value) + @staticmethod + def add_args(parser): - try: + Consumer.add_args( + parser, default_input_queue, default_subscriber, + ) - v = msg.value() - - if v.entity.value != "": - for vec in v.vectors: - self.vecstore.insert(vec, v.entity.value) - - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) - - except Exception as e: - - print("Exception:", e, flush=True) - - # Message failed to be processed - self.consumer.negative_acknowledge(msg) - - def __del__(self): - - if self.client: - self.client.close() + parser.add_argument( + '-t', '--store-uri', + default="http://milvus:19530", + help=f'Milvus store URI (default: http://milvus:19530)' + ) def run(): - parser = argparse.ArgumentParser( - prog='pdf-decoder', - description=__doc__, - ) - - parser.add_argument( - '-p', '--pulsar-host', - default=default_pulsar_host, - help=f'Pulsar host (default: {default_pulsar_host})', - ) - - parser.add_argument( - '-i', '--input-queue', - default=default_input_queue, - help=f'Input queue (default: {default_input_queue})' - ) - - parser.add_argument( - '-s', '--subscriber', - default=default_subscriber, - help=f'Queue subscriber name (default: {default_subscriber})' - ) - - parser.add_argument( - '-l', '--log-level', - type=LogLevel, - default=LogLevel.INFO, - choices=list(LogLevel), - help=f'Output queue (default: info)' - ) - - parser.add_argument( - '-t', '--store-uri', - default="http://milvus:19530", - help=f'Milvus store URI (default: http://milvus:19530)' - ) - - args = parser.parse_args() - - while True: - - try: - - p = Processor( - pulsar_host=args.pulsar_host, - input_queue=args.input_queue, - subscriber=args.subscriber, - store_uri=args.store_uri, - log_level=args.log_level, - ) - - p.run() - - except Exception as e: - - print("Exception:", e, flush=True) - print("Will retry...", flush=True) - - time.sleep(10) - + Processor.start("vector-write-milvus", __doc__) +