Added consumer

This commit is contained in:
Cyber MacGeddon 2024-07-17 15:25:40 +01:00
parent 605a455e07
commit 7b6f10de69
2 changed files with 84 additions and 50 deletions

View file

@ -3,6 +3,7 @@ import os
import argparse
import pulsar
import time
from pulsar.schema import JsonSchema
from .. log_level import LogLevel
@ -16,8 +17,6 @@ class BaseProcessor:
log_level=LogLevel.INFO,
):
print("BASE INIT")
self.client = None
if pulsar_host == None:
@ -52,6 +51,9 @@ class BaseProcessor:
help=f'Output queue (default: info)'
)
def run(self):
raise RuntimeError("Something should have implemented the run method")
@classmethod
def start(cls, prog, doc):
@ -77,3 +79,64 @@ class BaseProcessor:
time.sleep(10)
class Consumer(BaseProcessor):
def __init__(
self,
pulsar_host=None,
log_level=LogLevel.INFO,
input_queue="input",
subscriber="subscriber",
request_schema=None,
):
super(Consumer, self).__init__(
pulsar_host=pulsar_host,
log_level=log_level,
)
if request_schema == None:
raise RuntimeError("request_schema must be specified")
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(request_schema),
)
def run(self):
while True:
msg = self.consumer.receive()
try:
self.handle(msg)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
@staticmethod
def add_args(parser, default_input_queue, default_subscriber):
BaseProcessor.add_args(parser)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)

View file

@ -15,7 +15,7 @@ import time
from ... schema import TextCompletionRequest, TextCompletionResponse
from ... log_level import LogLevel
from ... base import BaseProcessor
from ... base import Consumer
default_input_queue = 'llm-complete-text'
default_output_queue = 'llm-complete-text-response'
@ -23,7 +23,7 @@ default_subscriber = 'llm-ollama-text'
default_model = 'gemma2'
default_ollama = 'http://localhost:11434'
class Processor(BaseProcessor):
class Processor(Consumer):
def __init__(
self,
@ -37,12 +37,11 @@ class Processor(BaseProcessor):
):
super(Processor, self).__init__(
pulsar_host=pulsar_host
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(TextCompletionRequest),
pulsar_host=pulsar_host,
log_level=log_level,
input_queue=default_input_queue,
subscriber=default_subscriber,
request_schema=TextCompletionRequest,
)
self.producer = self.client.create_producer(
@ -52,57 +51,29 @@ class Processor(BaseProcessor):
self.llm = Ollama(base_url=ollama, model=model)
def run(self):
def handle(self, msg):
while True:
v = msg.value()
msg = self.consumer.receive()
# Sender-produced ID
try:
id = msg.properties()["id"]
v = msg.value()
print(f"Handling prompt {id}...", flush=True)
# Sender-produced ID
prompt = v.prompt
response = self.llm.invoke(prompt)
id = msg.properties()["id"]
print("Send response...", flush=True)
r = TextCompletionResponse(response=response)
self.producer.send(r, properties={"id": id})
print(f"Handling prompt {id}...", flush=True)
prompt = v.prompt
response = self.llm.invoke(prompt)
print("Send response...", flush=True)
r = TextCompletionResponse(response=response)
self.producer.send(r, properties={"id": id})
print("Done.", flush=True)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
print("Done.", flush=True)
@staticmethod
def add_args(parser):
BaseProcessor.add_args(parser)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
Consumer.add_args(parser, default_input_queue, default_subscriber)
parser.add_argument(
'-o', '--output-queue',