Other LLMs

This commit is contained in:
Cyber MacGeddon 2024-07-17 17:18:24 +01:00
parent ab616b2779
commit 96a12efd70
3 changed files with 159 additions and 414 deletions

View file

@ -4,30 +4,21 @@ Simple LLM service, performs text prompt completion using the Azure
serverless endpoint service. Input is prompt, output is response.
"""
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
from langchain_community.llms import Ollama
import requests
import time
import json
from ... schema import TextCompletionRequest, TextCompletionResponse
from ... log_level import LogLevel
from ... base import ConsumerProducer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'llm-complete-text'
default_output_queue = 'llm-complete-text-response'
default_subscriber = 'llm-azure-text'
class Processor:
class Processor(ConsumerProducer):
def __init__(
self,
pulsar_host=default_pulsar_host,
pulsar_host=None,
input_queue=default_input_queue,
output_queue=default_output_queue,
subscriber=default_subscriber,
@ -36,21 +27,14 @@ class Processor:
token=None,
):
self.client = None
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(TextCompletionRequest),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(TextCompletionResponse),
super(Processor, self).__init__(
pulsar_host=pulsar_host,
log_level=log_level,
input_queue=input_queue,
output_queue=output_queue,
subscriber=subscriber,
input_schema=TextCompletionRequest,
output_schema=TextCompletionResponse,
)
self.endpoint = endpoint
@ -96,120 +80,47 @@ class Processor:
return message_content
def run(self):
def handle(self, msg):
while True:
v = msg.value()
msg = self.consumer.receive()
# Sender-produced ID
try:
id = msg.properties()["id"]
v = msg.value()
print(f"Handling prompt {id}...", flush=True)
# Sender-produced ID
prompt = self.build_prompt(
"You are a helpful chatbot",
v.prompt
)
id = msg.properties()["id"]
response = self.call_llm(prompt)
print(f"Handling prompt {id}...", flush=True)
print("Send response...", flush=True)
r = TextCompletionResponse(response=response)
self.producer.send(r, properties={"id": id})
prompt = self.build_prompt(
"You are a helpful chatbot",
v.prompt
)
response = self.call_llm(prompt)
print("Done.", flush=True)
print("Send response...", flush=True)
r = TextCompletionResponse(response=response)
self.producer.send(r, properties={"id": id})
@staticmethod
def add_args(parser):
print("Done.", flush=True)
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
parser.add_argument(
'-e', '--endpoint',
help=f'LLM model endpoint'
)
except Exception as e:
print("Exception:", e, flush=True)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
self.client.close()
parser.add_argument(
'-k', '--token',
help=f'LLM model token'
)
def run():
parser = argparse.ArgumentParser(
prog='llm-ollama-text',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-e', '--endpoint',
help=f'LLM model endpoint'
)
parser.add_argument(
'-k', '--token',
help=f'LLM model token'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
endpoint=args.endpoint,
token=args.token,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)
Processor.start("llm-azure-text", __doc__)

View file

@ -4,19 +4,12 @@ Simple LLM service, performs text prompt completion using Claude.
Input is prompt, output is response.
"""
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
import anthropic
import time
from ... schema import TextCompletionRequest, TextCompletionResponse
from ... log_level import LogLevel
from ... base import ConsumerProducer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'llm-complete-text'
default_output_queue = 'llm-complete-text-response'
default_subscriber = 'llm-claude-text'
@ -26,30 +19,23 @@ class Processor:
def __init__(
self,
pulsar_host=default_pulsar_host,
pulsar_host=None,
input_queue=default_input_queue,
output_queue=default_output_queue,
subscriber=default_subscriber,
log_level=LogLevel.INFO,
model=default_model,
api_key,
api_key="",
):
self.client = None
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(TextCompletionRequest),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(TextCompletionResponse),
super(Processor, self).__init__(
pulsar_host=pulsar_host,
log_level=log_level,
input_queue=input_queue,
output_queue=output_queue,
subscriber=subscriber,
input_schema=TextCompletionRequest,
output_schema=TextCompletionResponse,
)
self.model = model
@ -58,135 +44,65 @@ class Processor:
print("Initialised", flush=True)
def run(self):
def handle(self, msg):
while True:
v = msg.value()
msg = self.consumer.receive()
# Sender-produced ID
try:
id = msg.properties()["id"]
v = msg.value()
print(f"Handling prompt {id}...", flush=True)
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True)
prompt = v.prompt
response = message = self.claude.messages.create(
model=self.model,
max_tokens=1000,
temperature=0.1,
system = "You are a helpful chatbot.",
messages=[
prompt = v.prompt
response = message = self.claude.messages.create(
model=self.model,
max_tokens=1000,
temperature=0.1,
system = "You are a helpful chatbot.",
messages=[
{
"role": "user",
"content": [
{
"role": "user",
"content": [
{
"type": "text",
"text": prompt
}
]
"type": "text",
"text": prompt
}
]
)
}
]
)
resp = response.content[0].text
print(resp, flush=True)
resp = response.content[0].text
print(resp, flush=True)
print("Send response...", flush=True)
r = TextCompletionResponse(response=resp)
self.producer.send(r, properties={"id": id})
print("Send response...", flush=True)
r = TextCompletionResponse(response=resp)
self.send(r, properties={"id": id})
print("Done.", flush=True)
print("Done.", flush=True)
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
@staticmethod
def add_args(parser):
except Exception as e:
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
print("Exception:", e, flush=True)
parser.add_argument(
'-m', '--model',
default="claude-3-5-sonnet-20240620",
help=f'LLM model (default: claude-3-5-sonnet-20240620)'
)
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
def __del__(self):
self.client.close()
parser.add_argument(
'-k', '--api-key',
help=f'Claude API key'
)
def run():
parser = argparse.ArgumentParser(
prog='llm-ollama-text',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-m', '--model',
default="claude-3-5-sonnet-20240620",
help=f'LLM model (default: claude-3-5-sonnet-20240620)'
)
parser.add_argument(
'-k', '--api-key',
help=f'Claude API key'
)
args = parser.parse_args()
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
model=args.model,
api_key=args.api_key,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)
Processor.start("llm-claude-text", __doc__)

View file

@ -4,12 +4,6 @@ Simple LLM service, performs text prompt completion using VertexAI on
Google Cloud. Input is prompt, output is response.
"""
import pulsar
from pulsar.schema import JsonSchema
import tempfile
import base64
import os
import argparse
import vertexai
import time
@ -29,41 +23,34 @@ from vertexai.preview.generative_models import (
from ... schema import TextCompletionRequest, TextCompletionResponse
from ... log_level import LogLevel
from ... base import ConsumerProducer
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_input_queue = 'llm-complete-text'
default_output_queue = 'llm-complete-text-response'
default_subscriber = 'llm-vertexai-text'
class Processor:
class Processor(ConsumerProducer):
def __init__(
self,
pulsar_host=default_pulsar_host,
pulsar_host=None,
input_queue=default_input_queue,
output_queue=default_output_queue,
subscriber=default_subscriber,
log_level=LogLevel.INFO,
region="us-west1",
model="gemini-1.0-pro-001",
credentials,
private_key=None,
):
self.client = None
self.client = pulsar.Client(
pulsar_host,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)
self.consumer = self.client.subscribe(
input_queue, subscriber,
schema=JsonSchema(TextCompletionRequest),
)
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(TextCompletionResponse),
super(Processor, self).__init__(
pulsar_host=pulsar_host,
log_level=log_level,
input_queue=input_queue,
output_queue=output_queue,
subscriber=subscriber,
input_schema=TextCompletionRequest,
output_schema=TextCompletionResponse,
)
self.parameters = {
@ -95,6 +82,11 @@ class Processor:
print("Initialise VertexAI...", flush=True)
if private_key:
credentials = service_account.Credentials.from_service_account_file(private_key)
else:
credentials = None
if credentials:
vertexai.init(
location=region,
@ -111,148 +103,74 @@ class Processor:
print("Initialisation complete", flush=True)
def run(self):
def handle(self, msg):
while True:
try:
msg = self.consumer.receive()
v = msg.value()
try:
# Sender-produced ID
v = msg.value()
id = msg.properties()["id"]
# Sender-produced ID
print(f"Handling prompt {id}...", flush=True)
id = msg.properties()["id"]
prompt = v.prompt
print(f"Handling prompt {id}...", flush=True)
resp = self.llm.generate_content(
prompt, generation_config=self.generation_config,
safety_settings=self.safety_settings
)
prompt = v.prompt
resp = resp.text
resp = self.llm.generate_content(
prompt, generation_config=self.generation_config,
safety_settings=self.safety_settings
)
resp = resp.replace("```json", "")
resp = resp.replace("```", "")
resp = resp.text
print("Send response...", flush=True)
r = TextCompletionResponse(response=resp)
self.producer.send(r, properties={"id": id})
resp = resp.replace("```json", "")
resp = resp.replace("```", "")
print("Done.", flush=True)
print("Send response...", flush=True)
r = TextCompletionResponse(response=resp)
self.producer.send(r, properties={"id": id})
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
print("Done.", flush=True)
except google.api_core.exceptions.ResourceExhausted:
# Acknowledge successful processing of the message
self.consumer.acknowledge(msg)
print("429, resource busy, sleeping", flush=True)
time.sleep(15)
self.consumer.negative_acknowledge(msg)
except google.api_core.exceptions.ResourceExhausted:
# Let other exceptions fall through
print("429, resource busy, sleeping", flush=True)
time.sleep(15)
self.consumer.negative_acknowledge(msg)
@staticmethod
def add_args(parser):
except Exception as e:
ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)
print("Exception:", e, flush=True)
parser.add_argument(
'-m', '--model',
default="gemini-1.0-pro-001",
help=f'LLM model (default: gemini-1.0-pro-001)'
)
# Also: text-bison-32k
# Message failed to be processed
self.consumer.negative_acknowledge(msg)
parser.add_argument(
'-k', '--private-key',
help=f'Google Cloud private JSON file'
)
def __del__(self):
if self.client:
self.client.close()
parser.add_argument(
'-r', '--region',
default='us-west1',
help=f'Google Cloud region (default: us-west1)',
)
def run():
parser = argparse.ArgumentParser(
prog='llm-ollama-text',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'-i', '--input-queue',
default=default_input_queue,
help=f'Input queue (default: {default_input_queue})'
)
parser.add_argument(
'-s', '--subscriber',
default=default_subscriber,
help=f'Queue subscriber name (default: {default_subscriber})'
)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)
parser.add_argument(
'-l', '--log-level',
type=LogLevel,
default=LogLevel.INFO,
choices=list(LogLevel),
help=f'Output queue (default: info)'
)
parser.add_argument(
'-m', '--model',
default="gemini-1.0-pro-001",
help=f'LLM model (default: gemini-1.0-pro-001)'
)
# Also: text-bison-32k
parser.add_argument(
'-k', '--private-key',
help=f'Google Cloud private JSON file'
)
parser.add_argument(
'-r', '--region',
default='us-west1',
help=f'Google Cloud region (default: us-west1)',
)
args = parser.parse_args()
if args.private_key:
credentials = service_account.Credentials.from_service_account_file(
args.private_key
)
else:
credentials = None
while True:
try:
p = Processor(
pulsar_host=args.pulsar_host,
input_queue=args.input_queue,
output_queue=args.output_queue,
subscriber=args.subscriber,
log_level=args.log_level,
credentials=credentials,
region=args.region,
model=args.model,
)
p.run()
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)
Processor.start("llm-vertexai-text", __doc__)