Base class

This commit is contained in:
Cyber MacGeddon 2024-07-17 15:13:12 +01:00
parent d007a5c97c
commit 605a455e07
3 changed files with 121 additions and 84 deletions

View file

@ -0,0 +1,3 @@
from . processor import *

View file

@ -0,0 +1,79 @@
import os
import argparse
import pulsar
import time
from .. log_level import LogLevel
class BaseProcessor:
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
def __init__(
self,
pulsar_host=default_pulsar_host,
log_level=LogLevel.INFO,
):
print("BASE INIT")
self.client = None
if pulsar_host == None:
pulsar_host = default_pulsar_host
self.pulsar_host = pulsar_host
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
def __del__(self):
if self.client:
self.client.close()
@staticmethod
def add_args(parser):
parser.add_argument(
'-p', '--pulsar-host',
default=__class__.default_pulsar_host,
help=f'Pulsar host (default: {__class__.default_pulsar_host})',
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
@classmethod
def start(cls, prog, doc):
parser = argparse.ArgumentParser(
prog=prog,
description=doc
)
cls.add_args(parser)
args = parser.parse_args()
args = vars(args)
try:
p = cls(**args)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)

View file

@ -15,19 +15,19 @@ import time
from ... schema import TextCompletionRequest, TextCompletionResponse
from ... log_level import LogLevel
from ... base import BaseProcessor
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'llm-complete-text'
default_output_queue = 'llm-complete-text-response'
default_subscriber = 'llm-ollama-text'
default_model = 'gemma2'
default_ollama = 'http://localhost:11434'
class Processor:
class Processor(BaseProcessor):
def __init__(
self,
pulsar_host=default_pulsar_host,
pulsar_host=None,
input_queue=default_input_queue,
output_queue=default_output_queue,
subscriber=default_subscriber,
@ -36,11 +36,8 @@ class Processor:
ollama=default_ollama,
):
self.client = None
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
super(Processor, self).__init__(
pulsar_host=pulsar_host
)
self.consumer = self.client.subscribe(
@ -90,85 +87,43 @@ class Processor:
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
@staticmethod
def add_args(parser):
if self.client:
self.client.close()
BaseProcessor.add_args(parser)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-m', '--model',
default="gemma2",
help=f'LLM model (default: gemma2)'
)
parser.add_argument(
'-r', '--ollama',
default=default_ollama,
help=f'ollama (default: {default_ollama})'
)
def run():
parser = argparse.ArgumentParser(
prog='llm-ollama-text',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-m', '--model',
default="gemma2",
help=f'LLM model (default: gemma2)'
)
parser.add_argument(
'-r', '--ollama',
default=default_ollama,
help=f'ollama (default: {default_ollama})'
)
args = parser.parse_args()
Processor.start("llm-ollama-text", __doc__)
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
model=args.model,
ollama=args.ollama,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)