diff --git a/trustgraph/base/processor.py b/trustgraph/base/processor.py index 2a946282..be3403ee 100644 --- a/trustgraph/base/processor.py +++ b/trustgraph/base/processor.py @@ -3,6 +3,7 @@ import os import argparse import pulsar import time +from pulsar.schema import JsonSchema from .. log_level import LogLevel @@ -16,8 +17,6 @@ class BaseProcessor: log_level=LogLevel.INFO, ): - print("BASE INIT") - self.client = None if pulsar_host == None: @@ -52,6 +51,9 @@ class BaseProcessor: help=f'Output queue (default: info)' ) + def run(self): + raise RuntimeError("Something should have implemented the run method") + @classmethod def start(cls, prog, doc): @@ -77,3 +79,64 @@ class BaseProcessor: time.sleep(10) +class Consumer(BaseProcessor): + + def __init__( + self, + pulsar_host=None, + log_level=LogLevel.INFO, + input_queue="input", + subscriber="subscriber", + request_schema=None, + ): + + super(Consumer, self).__init__( + pulsar_host=pulsar_host, + log_level=log_level, + ) + + if request_schema == None: + raise RuntimeError("request_schema must be specified") + + self.consumer = self.client.subscribe( + input_queue, subscriber, + schema=JsonSchema(request_schema), + ) + + def run(self): + + while True: + + msg = self.consumer.receive() + + try: + + self.handle(msg) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + except Exception as e: + + print("Exception:", e, flush=True) + + # Message failed to be processed + self.consumer.negative_acknowledge(msg) + + @staticmethod + def add_args(parser, default_input_queue, default_subscriber): + + BaseProcessor.add_args(parser) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + diff --git a/trustgraph/llm/ollama_text/llm.py b/trustgraph/llm/ollama_text/llm.py index e8024b69..2646b736 100755 --- a/trustgraph/llm/ollama_text/llm.py +++ b/trustgraph/llm/ollama_text/llm.py @@ -15,7 +15,7 @@ import time from ... schema import TextCompletionRequest, TextCompletionResponse from ... log_level import LogLevel -from ... base import BaseProcessor +from ... base import Consumer default_input_queue = 'llm-complete-text' default_output_queue = 'llm-complete-text-response' @@ -23,7 +23,7 @@ default_subscriber = 'llm-ollama-text' default_model = 'gemma2' default_ollama = 'http://localhost:11434' -class Processor(BaseProcessor): +class Processor(Consumer): def __init__( self, @@ -37,12 +37,11 @@ class Processor(BaseProcessor): ): super(Processor, self).__init__( - pulsar_host=pulsar_host - ) - - self.consumer = self.client.subscribe( - input_queue, subscriber, - schema=JsonSchema(TextCompletionRequest), + pulsar_host=pulsar_host, + log_level=log_level, + input_queue=default_input_queue, + subscriber=default_subscriber, + request_schema=TextCompletionRequest, ) self.producer = self.client.create_producer( @@ -52,57 +51,29 @@ class Processor(BaseProcessor): self.llm = Ollama(base_url=ollama, model=model) - def run(self): + def handle(self, msg): - while True: + v = msg.value() - msg = self.consumer.receive() + # Sender-produced ID - try: + id = msg.properties()["id"] - v = msg.value() + print(f"Handling prompt {id}...", flush=True) - # Sender-produced ID + prompt = v.prompt + response = self.llm.invoke(prompt) - id = msg.properties()["id"] + print("Send response...", flush=True) + r = TextCompletionResponse(response=response) + self.producer.send(r, properties={"id": id}) - print(f"Handling prompt {id}...", flush=True) - - prompt = v.prompt - response = self.llm.invoke(prompt) - - print("Send response...", flush=True) - r = TextCompletionResponse(response=response) - self.producer.send(r, properties={"id": id}) - - print("Done.", flush=True) - - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) - - except Exception as e: - - print("Exception:", e, flush=True) - - # Message failed to be processed - self.consumer.negative_acknowledge(msg) + print("Done.", flush=True) @staticmethod def add_args(parser): - BaseProcessor.add_args(parser) - - parser.add_argument( - '-i', '--input-queue', - default=default_input_queue, - help=f'Input queue (default: {default_input_queue})' - ) - - parser.add_argument( - '-s', '--subscriber', - default=default_subscriber, - help=f'Queue subscriber name (default: {default_subscriber})' - ) + Consumer.add_args(parser, default_input_queue, default_subscriber) parser.add_argument( '-o', '--output-queue',