From 605a455e07d8585ef5447b4dc2c754e37aa6aad0 Mon Sep 17 00:00:00 2001 From: Cyber MacGeddon Date: Wed, 17 Jul 2024 15:13:12 +0100 Subject: [PATCH] Base class --- trustgraph/base/__init__.py | 3 + trustgraph/base/processor.py | 79 +++++++++++++++++++ trustgraph/llm/ollama_text/llm.py | 123 ++++++++++-------------------- 3 files changed, 121 insertions(+), 84 deletions(-) create mode 100644 trustgraph/base/__init__.py create mode 100644 trustgraph/base/processor.py diff --git a/trustgraph/base/__init__.py b/trustgraph/base/__init__.py new file mode 100644 index 00000000..9d16af90 --- /dev/null +++ b/trustgraph/base/__init__.py @@ -0,0 +1,3 @@ + +from . processor import * + diff --git a/trustgraph/base/processor.py b/trustgraph/base/processor.py new file mode 100644 index 00000000..2a946282 --- /dev/null +++ b/trustgraph/base/processor.py @@ -0,0 +1,79 @@ + +import os +import argparse +import pulsar +import time + +from .. log_level import LogLevel + +class BaseProcessor: + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + + def __init__( + self, + pulsar_host=default_pulsar_host, + log_level=LogLevel.INFO, + ): + + print("BASE INIT") + + self.client = None + + if pulsar_host == None: + pulsar_host = default_pulsar_host + + self.pulsar_host = pulsar_host + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + def __del__(self): + + if self.client: + self.client.close() + + @staticmethod + def add_args(parser): + + parser.add_argument( + '-p', '--pulsar-host', + default=__class__.default_pulsar_host, + help=f'Pulsar host (default: {__class__.default_pulsar_host})', + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.INFO, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + @classmethod + def start(cls, prog, doc): + + parser = argparse.ArgumentParser( + prog=prog, + description=doc + ) + + cls.add_args(parser) + + args = parser.parse_args() + args = vars(args) + + try: + + p = cls(**args) + p.run() + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + diff --git a/trustgraph/llm/ollama_text/llm.py b/trustgraph/llm/ollama_text/llm.py index b7a5fda9..e8024b69 100755 --- a/trustgraph/llm/ollama_text/llm.py +++ b/trustgraph/llm/ollama_text/llm.py @@ -15,19 +15,19 @@ import time from ... schema import TextCompletionRequest, TextCompletionResponse from ... log_level import LogLevel +from ... base import BaseProcessor -default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') default_input_queue = 'llm-complete-text' default_output_queue = 'llm-complete-text-response' default_subscriber = 'llm-ollama-text' default_model = 'gemma2' default_ollama = 'http://localhost:11434' -class Processor: +class Processor(BaseProcessor): def __init__( self, - pulsar_host=default_pulsar_host, + pulsar_host=None, input_queue=default_input_queue, output_queue=default_output_queue, subscriber=default_subscriber, @@ -36,11 +36,8 @@ class Processor: ollama=default_ollama, ): - self.client = None - - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + super(Processor, self).__init__( + pulsar_host=pulsar_host ) self.consumer = self.client.subscribe( @@ -90,85 +87,43 @@ class Processor: # Message failed to be processed self.consumer.negative_acknowledge(msg) - def __del__(self): + @staticmethod + def add_args(parser): - if self.client: - self.client.close() + BaseProcessor.add_args(parser) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) + + parser.add_argument( + '-m', '--model', + default="gemma2", + help=f'LLM model (default: gemma2)' + ) + + parser.add_argument( + '-r', '--ollama', + default=default_ollama, + help=f'ollama (default: {default_ollama})' + ) def run(): - parser = argparse.ArgumentParser( - prog='llm-ollama-text', - description=__doc__, - ) - - parser.add_argument( - '-p', '--pulsar-host', - default=default_pulsar_host, - help=f'Pulsar host (default: {default_pulsar_host})', - ) - - parser.add_argument( - '-i', '--input-queue', - default=default_input_queue, - help=f'Input queue (default: {default_input_queue})' - ) - - parser.add_argument( - '-s', '--subscriber', - default=default_subscriber, - help=f'Queue subscriber name (default: {default_subscriber})' - ) - - parser.add_argument( - '-o', '--output-queue', - default=default_output_queue, - help=f'Output queue (default: {default_output_queue})' - ) - - parser.add_argument( - '-l', '--log-level', - type=LogLevel, - default=LogLevel.INFO, - choices=list(LogLevel), - help=f'Output queue (default: info)' - ) - - parser.add_argument( - '-m', '--model', - default="gemma2", - help=f'LLM model (default: gemma2)' - ) - - parser.add_argument( - '-r', '--ollama', - default=default_ollama, - help=f'ollama (default: {default_ollama})' - ) - - args = parser.parse_args() + Processor.start("llm-ollama-text", __doc__) - while True: - - try: - - p = Processor( - pulsar_host=args.pulsar_host, - input_queue=args.input_queue, - output_queue=args.output_queue, - subscriber=args.subscriber, - log_level=args.log_level, - model=args.model, - ollama=args.ollama, - ) - - p.run() - - except Exception as e: - - print("Exception:", e, flush=True) - print("Will retry...", flush=True) - - time.sleep(10) -