diff --git a/trustgraph/llm/azure_text/llm.py b/trustgraph/llm/azure_text/llm.py index dba4de66..f7d7ef6f 100755 --- a/trustgraph/llm/azure_text/llm.py +++ b/trustgraph/llm/azure_text/llm.py @@ -4,30 +4,21 @@ Simple LLM service, performs text prompt completion using the Azure serverless endpoint service. Input is prompt, output is response. """ -import pulsar -from pulsar.schema import JsonSchema -import tempfile -import base64 -import os -import argparse -from langchain_community.llms import Ollama import requests -import time -import json from ... schema import TextCompletionRequest, TextCompletionResponse from ... log_level import LogLevel +from ... base import ConsumerProducer -default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') default_input_queue = 'llm-complete-text' default_output_queue = 'llm-complete-text-response' default_subscriber = 'llm-azure-text' -class Processor: +class Processor(ConsumerProducer): def __init__( self, - pulsar_host=default_pulsar_host, + pulsar_host=None, input_queue=default_input_queue, output_queue=default_output_queue, subscriber=default_subscriber, @@ -36,21 +27,14 @@ class Processor: token=None, ): - self.client = None - - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) - - self.consumer = self.client.subscribe( - input_queue, subscriber, - schema=JsonSchema(TextCompletionRequest), - ) - - self.producer = self.client.create_producer( - topic=output_queue, - schema=JsonSchema(TextCompletionResponse), + super(Processor, self).__init__( + pulsar_host=pulsar_host, + log_level=log_level, + input_queue=input_queue, + output_queue=output_queue, + subscriber=subscriber, + input_schema=TextCompletionRequest, + output_schema=TextCompletionResponse, ) self.endpoint = endpoint @@ -96,120 +80,47 @@ class Processor: return message_content - def run(self): + def handle(self, msg): - while True: + v = msg.value() - msg = self.consumer.receive() + # Sender-produced ID - try: + id = msg.properties()["id"] - v = msg.value() + print(f"Handling prompt {id}...", flush=True) - # Sender-produced ID + prompt = self.build_prompt( + "You are a helpful chatbot", + v.prompt + ) - id = msg.properties()["id"] + response = self.call_llm(prompt) - print(f"Handling prompt {id}...", flush=True) + print("Send response...", flush=True) + r = TextCompletionResponse(response=response) + self.producer.send(r, properties={"id": id}) - prompt = self.build_prompt( - "You are a helpful chatbot", - v.prompt - ) - - response = self.call_llm(prompt) + print("Done.", flush=True) - print("Send response...", flush=True) - r = TextCompletionResponse(response=response) - self.producer.send(r, properties={"id": id}) + @staticmethod + def add_args(parser): - print("Done.", flush=True) + ConsumerProducer.add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ) - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) + parser.add_argument( + '-e', '--endpoint', + help=f'LLM model endpoint' + ) - except Exception as e: - - print("Exception:", e, flush=True) - - # Message failed to be processed - self.consumer.negative_acknowledge(msg) - - def __del__(self): - self.client.close() + parser.add_argument( + '-k', '--token', + help=f'LLM model token' + ) def run(): - - parser = argparse.ArgumentParser( - prog='llm-ollama-text', - description=__doc__, - ) - - parser.add_argument( - '-p', '--pulsar-host', - default=default_pulsar_host, - help=f'Pulsar host (default: {default_pulsar_host})', - ) - - parser.add_argument( - '-i', '--input-queue', - default=default_input_queue, - help=f'Input queue (default: {default_input_queue})' - ) - - parser.add_argument( - '-s', '--subscriber', - default=default_subscriber, - help=f'Queue subscriber name (default: {default_subscriber})' - ) - - parser.add_argument( - '-o', '--output-queue', - default=default_output_queue, - help=f'Output queue (default: {default_output_queue})' - ) - - parser.add_argument( - '-l', '--log-level', - type=LogLevel, - default=LogLevel.INFO, - choices=list(LogLevel), - help=f'Output queue (default: info)' - ) - - parser.add_argument( - '-e', '--endpoint', - help=f'LLM model endpoint' - ) - - parser.add_argument( - '-k', '--token', - help=f'LLM model token' - ) - - args = parser.parse_args() - - while True: - - try: - - p = Processor( - pulsar_host=args.pulsar_host, - input_queue=args.input_queue, - output_queue=args.output_queue, - subscriber=args.subscriber, - log_level=args.log_level, - endpoint=args.endpoint, - token=args.token, - ) - - p.run() - - except Exception as e: - - print("Exception:", e, flush=True) - print("Will retry...", flush=True) - - time.sleep(10) - - + + Processor.start("llm-azure-text", __doc__) diff --git a/trustgraph/llm/claude_text/llm.py b/trustgraph/llm/claude_text/llm.py index 00e6998e..af516b71 100755 --- a/trustgraph/llm/claude_text/llm.py +++ b/trustgraph/llm/claude_text/llm.py @@ -4,19 +4,12 @@ Simple LLM service, performs text prompt completion using Claude. Input is prompt, output is response. """ -import pulsar -from pulsar.schema import JsonSchema -import tempfile -import base64 -import os -import argparse import anthropic -import time from ... schema import TextCompletionRequest, TextCompletionResponse from ... log_level import LogLevel +from ... base import ConsumerProducer -default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') default_input_queue = 'llm-complete-text' default_output_queue = 'llm-complete-text-response' default_subscriber = 'llm-claude-text' @@ -26,30 +19,23 @@ class Processor: def __init__( self, - pulsar_host=default_pulsar_host, + pulsar_host=None, input_queue=default_input_queue, output_queue=default_output_queue, subscriber=default_subscriber, log_level=LogLevel.INFO, model=default_model, - api_key, + api_key="", ): - self.client = None - - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) - - self.consumer = self.client.subscribe( - input_queue, subscriber, - schema=JsonSchema(TextCompletionRequest), - ) - - self.producer = self.client.create_producer( - topic=output_queue, - schema=JsonSchema(TextCompletionResponse), + super(Processor, self).__init__( + pulsar_host=pulsar_host, + log_level=log_level, + input_queue=input_queue, + output_queue=output_queue, + subscriber=subscriber, + input_schema=TextCompletionRequest, + output_schema=TextCompletionResponse, ) self.model = model @@ -58,135 +44,65 @@ class Processor: print("Initialised", flush=True) - def run(self): + def handle(self, msg): - while True: + v = msg.value() - msg = self.consumer.receive() + # Sender-produced ID - try: + id = msg.properties()["id"] - v = msg.value() + print(f"Handling prompt {id}...", flush=True) - # Sender-produced ID - - id = msg.properties()["id"] - - print(f"Handling prompt {id}...", flush=True) - - prompt = v.prompt - response = message = self.claude.messages.create( - model=self.model, - max_tokens=1000, - temperature=0.1, - system = "You are a helpful chatbot.", - messages=[ + prompt = v.prompt + response = message = self.claude.messages.create( + model=self.model, + max_tokens=1000, + temperature=0.1, + system = "You are a helpful chatbot.", + messages=[ + { + "role": "user", + "content": [ { - "role": "user", - "content": [ - { - "type": "text", - "text": prompt - } - ] + "type": "text", + "text": prompt } ] - ) + } + ] + ) - resp = response.content[0].text - print(resp, flush=True) + resp = response.content[0].text + print(resp, flush=True) - print("Send response...", flush=True) - r = TextCompletionResponse(response=resp) - self.producer.send(r, properties={"id": id}) + print("Send response...", flush=True) + r = TextCompletionResponse(response=resp) + self.send(r, properties={"id": id}) - print("Done.", flush=True) + print("Done.", flush=True) - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) + @staticmethod + def add_args(parser): - except Exception as e: + ConsumerProducer.add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ) - print("Exception:", e, flush=True) + parser.add_argument( + '-m', '--model', + default="claude-3-5-sonnet-20240620", + help=f'LLM model (default: claude-3-5-sonnet-20240620)' + ) - # Message failed to be processed - self.consumer.negative_acknowledge(msg) - - def __del__(self): - self.client.close() + parser.add_argument( + '-k', '--api-key', + help=f'Claude API key' + ) def run(): - parser = argparse.ArgumentParser( - prog='llm-ollama-text', - description=__doc__, - ) - - parser.add_argument( - '-p', '--pulsar-host', - default=default_pulsar_host, - help=f'Pulsar host (default: {default_pulsar_host})', - ) - - parser.add_argument( - '-i', '--input-queue', - default=default_input_queue, - help=f'Input queue (default: {default_input_queue})' - ) - - parser.add_argument( - '-s', '--subscriber', - default=default_subscriber, - help=f'Queue subscriber name (default: {default_subscriber})' - ) - - parser.add_argument( - '-o', '--output-queue', - default=default_output_queue, - help=f'Output queue (default: {default_output_queue})' - ) - - parser.add_argument( - '-l', '--log-level', - type=LogLevel, - default=LogLevel.INFO, - choices=list(LogLevel), - help=f'Output queue (default: info)' - ) - - parser.add_argument( - '-m', '--model', - default="claude-3-5-sonnet-20240620", - help=f'LLM model (default: claude-3-5-sonnet-20240620)' - ) - - parser.add_argument( - '-k', '--api-key', - help=f'Claude API key' - ) - - args = parser.parse_args() - - while True: - - try: - - p = Processor( - pulsar_host=args.pulsar_host, - input_queue=args.input_queue, - output_queue=args.output_queue, - subscriber=args.subscriber, - log_level=args.log_level, - model=args.model, - api_key=args.api_key, - ) - - p.run() - - except Exception as e: - - print("Exception:", e, flush=True) - print("Will retry...", flush=True) - - time.sleep(10) + Processor.start("llm-claude-text", __doc__) + diff --git a/trustgraph/llm/vertexai_text/llm.py b/trustgraph/llm/vertexai_text/llm.py index f4d55a18..c6c6c81d 100755 --- a/trustgraph/llm/vertexai_text/llm.py +++ b/trustgraph/llm/vertexai_text/llm.py @@ -4,12 +4,6 @@ Simple LLM service, performs text prompt completion using VertexAI on Google Cloud. Input is prompt, output is response. """ -import pulsar -from pulsar.schema import JsonSchema -import tempfile -import base64 -import os -import argparse import vertexai import time @@ -29,41 +23,34 @@ from vertexai.preview.generative_models import ( from ... schema import TextCompletionRequest, TextCompletionResponse from ... log_level import LogLevel +from ... base import ConsumerProducer -default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') default_input_queue = 'llm-complete-text' default_output_queue = 'llm-complete-text-response' default_subscriber = 'llm-vertexai-text' -class Processor: +class Processor(ConsumerProducer): def __init__( self, - pulsar_host=default_pulsar_host, + pulsar_host=None, input_queue=default_input_queue, output_queue=default_output_queue, subscriber=default_subscriber, log_level=LogLevel.INFO, region="us-west1", model="gemini-1.0-pro-001", - credentials, + private_key=None, ): - self.client = None - - self.client = pulsar.Client( - pulsar_host, - logger=pulsar.ConsoleLogger(log_level.to_pulsar()) - ) - - self.consumer = self.client.subscribe( - input_queue, subscriber, - schema=JsonSchema(TextCompletionRequest), - ) - - self.producer = self.client.create_producer( - topic=output_queue, - schema=JsonSchema(TextCompletionResponse), + super(Processor, self).__init__( + pulsar_host=pulsar_host, + log_level=log_level, + input_queue=input_queue, + output_queue=output_queue, + subscriber=subscriber, + input_schema=TextCompletionRequest, + output_schema=TextCompletionResponse, ) self.parameters = { @@ -95,6 +82,11 @@ class Processor: print("Initialise VertexAI...", flush=True) + if private_key: + credentials = service_account.Credentials.from_service_account_file(private_key) + else: + credentials = None + if credentials: vertexai.init( location=region, @@ -111,148 +103,74 @@ class Processor: print("Initialisation complete", flush=True) - def run(self): + def handle(self, msg): - while True: + try: - msg = self.consumer.receive() + v = msg.value() - try: + # Sender-produced ID - v = msg.value() + id = msg.properties()["id"] - # Sender-produced ID + print(f"Handling prompt {id}...", flush=True) - id = msg.properties()["id"] + prompt = v.prompt - print(f"Handling prompt {id}...", flush=True) + resp = self.llm.generate_content( + prompt, generation_config=self.generation_config, + safety_settings=self.safety_settings + ) - prompt = v.prompt + resp = resp.text - resp = self.llm.generate_content( - prompt, generation_config=self.generation_config, - safety_settings=self.safety_settings - ) + resp = resp.replace("```json", "") + resp = resp.replace("```", "") - resp = resp.text + print("Send response...", flush=True) + r = TextCompletionResponse(response=resp) + self.producer.send(r, properties={"id": id}) - resp = resp.replace("```json", "") - resp = resp.replace("```", "") + print("Done.", flush=True) - print("Send response...", flush=True) - r = TextCompletionResponse(response=resp) - self.producer.send(r, properties={"id": id}) + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) - print("Done.", flush=True) + except google.api_core.exceptions.ResourceExhausted: - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) + print("429, resource busy, sleeping", flush=True) + time.sleep(15) + self.consumer.negative_acknowledge(msg) - except google.api_core.exceptions.ResourceExhausted: + # Let other exceptions fall through - print("429, resource busy, sleeping", flush=True) - time.sleep(15) - self.consumer.negative_acknowledge(msg) + @staticmethod + def add_args(parser): - except Exception as e: + ConsumerProducer.add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ) - print("Exception:", e, flush=True) + parser.add_argument( + '-m', '--model', + default="gemini-1.0-pro-001", + help=f'LLM model (default: gemini-1.0-pro-001)' + ) + # Also: text-bison-32k - # Message failed to be processed - self.consumer.negative_acknowledge(msg) + parser.add_argument( + '-k', '--private-key', + help=f'Google Cloud private JSON file' + ) - def __del__(self): - - if self.client: - self.client.close() + parser.add_argument( + '-r', '--region', + default='us-west1', + help=f'Google Cloud region (default: us-west1)', + ) def run(): - parser = argparse.ArgumentParser( - prog='llm-ollama-text', - description=__doc__, - ) - - parser.add_argument( - '-p', '--pulsar-host', - default=default_pulsar_host, - help=f'Pulsar host (default: {default_pulsar_host})', - ) - - parser.add_argument( - '-i', '--input-queue', - default=default_input_queue, - help=f'Input queue (default: {default_input_queue})' - ) - - parser.add_argument( - '-s', '--subscriber', - default=default_subscriber, - help=f'Queue subscriber name (default: {default_subscriber})' - ) - - parser.add_argument( - '-o', '--output-queue', - default=default_output_queue, - help=f'Output queue (default: {default_output_queue})' - ) - - parser.add_argument( - '-l', '--log-level', - type=LogLevel, - default=LogLevel.INFO, - choices=list(LogLevel), - help=f'Output queue (default: info)' - ) - - parser.add_argument( - '-m', '--model', - default="gemini-1.0-pro-001", - help=f'LLM model (default: gemini-1.0-pro-001)' - ) - # Also: text-bison-32k - - parser.add_argument( - '-k', '--private-key', - help=f'Google Cloud private JSON file' - ) - - parser.add_argument( - '-r', '--region', - default='us-west1', - help=f'Google Cloud region (default: us-west1)', - ) - - args = parser.parse_args() - - if args.private_key: - credentials = service_account.Credentials.from_service_account_file( - args.private_key - ) - else: - credentials = None - - while True: - - try: - - p = Processor( - pulsar_host=args.pulsar_host, - input_queue=args.input_queue, - output_queue=args.output_queue, - subscriber=args.subscriber, - log_level=args.log_level, - credentials=credentials, - region=args.region, - model=args.model, - ) - - p.run() - - except Exception as e: - - print("Exception:", e, flush=True) - print("Will retry...", flush=True) - - time.sleep(10) + Processor.start("llm-vertexai-text", __doc__)