Update LLMs to LlmService API (#353)

This commit is contained in:
cybermaggedon 2025-04-25 19:57:42 +01:00 committed by GitHub
parent 099018e103
commit 5af7909122
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 297 additions and 969 deletions

View file

@ -13,6 +13,11 @@ from .. base import FlowProcessor, ConsumerSpec, ProducerSpec
default_ident = "text-completion" default_ident = "text-completion"
class LlmResult: class LlmResult:
def __init__(self, text=None, in_token=None, out_token=None, model=None):
self.text = text
self.in_token = in_token
self.out_token = out_token
self.model = model
__slots__ = ["text", "in_token", "out_token", "model"] __slots__ = ["text", "in_token", "out_token", "model"]
class LlmService(FlowProcessor): class LlmService(FlowProcessor):

View file

@ -6,22 +6,14 @@ Input is prompt, output is response. Mistral is default.
import boto3 import boto3
import json import json
from prometheus_client import Histogram
import os import os
import enum import enum
from .... schema import TextCompletionRequest, TextCompletionResponse, Error
from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests from .... exceptions import TooManyRequests
from .... base import LlmService, LlmResult
module = "text-completion" default_ident = "text-completion"
default_input_queue = text_completion_request_queue
default_output_queue = text_completion_response_queue
default_subscriber = module
default_model = 'mistral.mistral-large-2407-v1:0' default_model = 'mistral.mistral-large-2407-v1:0'
default_temperature = 0.0 default_temperature = 0.0
default_max_output = 2048 default_max_output = 2048
@ -149,16 +141,12 @@ class Cohere(ModelHandler):
Default=Mistral Default=Mistral
class Processor(ConsumerProducer): class Processor(LlmService):
def __init__(self, **params): def __init__(self, **params):
print(params) print(params)
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
model = params.get("model", default_model) model = params.get("model", default_model)
temperature = params.get("temperature", default_temperature) temperature = params.get("temperature", default_temperature)
max_output = params.get("max_output", default_max_output) max_output = params.get("max_output", default_max_output)
@ -185,30 +173,12 @@ class Processor(ConsumerProducer):
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": TextCompletionRequest,
"output_schema": TextCompletionResponse,
"model": model, "model": model,
"temperature": temperature, "temperature": temperature,
"max_output": max_output, "max_output": max_output,
} }
) )
if not hasattr(__class__, "text_completion_metric"):
__class__.text_completion_metric = Histogram(
'text_completion_duration',
'Text completion duration (seconds)',
buckets=[
0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0,
8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0,
17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0,
30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0,
120.0
]
)
self.model = model self.model = model
self.temperature = temperature self.temperature = temperature
self.max_output = max_output self.max_output = max_output
@ -257,30 +227,21 @@ class Processor(ConsumerProducer):
return Default return Default
async def handle(self, msg): async def generate_content(self, system, prompt):
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True)
try: try:
promptbody = self.variant.encode_request(v.system, v.prompt) promptbody = self.variant.encode_request(system, prompt)
accept = 'application/json' accept = 'application/json'
contentType = 'application/json' contentType = 'application/json'
with __class__.text_completion_metric.time(): response = self.bedrock.invoke_model(
response = self.bedrock.invoke_model( body=promptbody,
body=promptbody, modelId=self.model,
modelId=self.model, accept=accept,
accept=accept, contentType=contentType
contentType=contentType )
)
# Response structure decode # Response structure decode
outputtext = self.variant.decode_response(response) outputtext = self.variant.decode_response(response)
@ -293,18 +254,14 @@ class Processor(ConsumerProducer):
print(f"Input Tokens: {inputtokens}", flush=True) print(f"Input Tokens: {inputtokens}", flush=True)
print(f"Output Tokens: {outputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True)
print("Send response...", flush=True) resp = LlmResult(
r = TextCompletionResponse( text = outputtext,
error=None, in_token = inputtokens,
response=outputtext, out_token = outputtokens,
in_token=inputtokens, model = self.model
out_token=outputtokens,
model=str(self.model),
) )
await self.send(r, properties={"id": id}) return resp
print("Done.", flush=True)
except self.bedrock.exceptions.ThrottlingException as e: except self.bedrock.exceptions.ThrottlingException as e:
@ -319,31 +276,12 @@ class Processor(ConsumerProducer):
print(type(e)) print(type(e))
print(f"Exception: {e}") print(f"Exception: {e}")
raise e
print("Send error response...", flush=True)
r = TextCompletionResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
in_token=None,
out_token=None,
model=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
@staticmethod @staticmethod
def add_args(parser): def add_args(parser):
ConsumerProducer.add_args( LlmService.add_args(parser)
parser, default_input_queue, default_subscriber,
default_output_queue,
)
parser.add_argument( parser.add_argument(
'-m', '--model', '-m', '--model',
@ -391,5 +329,4 @@ class Processor(ConsumerProducer):
def run(): def run():
Processor.launch(module, __doc__) Processor.launch(default_ident, __doc__)

View file

@ -9,31 +9,21 @@ import json
from prometheus_client import Histogram from prometheus_client import Histogram
import os import os
from .... schema import TextCompletionRequest, TextCompletionResponse, Error
from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests from .... exceptions import TooManyRequests
from .... base import LlmService, LlmResult
module = "text-completion" default_ident = "text-completion"
default_input_queue = text_completion_request_queue
default_output_queue = text_completion_response_queue
default_subscriber = module
default_temperature = 0.0 default_temperature = 0.0
default_max_output = 4192 default_max_output = 4192
default_model = "AzureAI" default_model = "AzureAI"
default_endpoint = os.getenv("AZURE_ENDPOINT") default_endpoint = os.getenv("AZURE_ENDPOINT")
default_token = os.getenv("AZURE_TOKEN") default_token = os.getenv("AZURE_TOKEN")
class Processor(ConsumerProducer): class Processor(LlmService):
def __init__(self, **params): def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
endpoint = params.get("endpoint", default_endpoint) endpoint = params.get("endpoint", default_endpoint)
token = params.get("token", default_token) token = params.get("token", default_token)
temperature = params.get("temperature", default_temperature) temperature = params.get("temperature", default_temperature)
@ -48,30 +38,13 @@ class Processor(ConsumerProducer):
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
"input_queue": input_queue, "endpoint": endpoint,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": TextCompletionRequest,
"output_schema": TextCompletionResponse,
"temperature": temperature, "temperature": temperature,
"max_output": max_output, "max_output": max_output,
"model": model, "model": model,
} }
) )
if not hasattr(__class__, "text_completion_metric"):
__class__.text_completion_metric = Histogram(
'text_completion_duration',
'Text completion duration (seconds)',
buckets=[
0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0,
8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0,
17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0,
30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0,
120.0
]
)
self.endpoint = endpoint self.endpoint = endpoint
self.token = token self.token = token
self.temperature = temperature self.temperature = temperature
@ -123,25 +96,16 @@ class Processor(ConsumerProducer):
return result return result
async def handle(self, msg): async def generate_content(self, system, prompt):
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True)
try: try:
prompt = self.build_prompt( prompt = self.build_prompt(
v.system, system,
v.prompt prompt
) )
with __class__.text_completion_metric.time(): response = self.call_llm(prompt)
response = self.call_llm(prompt)
resp = response['choices'][0]['message']['content'] resp = response['choices'][0]['message']['content']
inputtokens = response['usage']['prompt_tokens'] inputtokens = response['usage']['prompt_tokens']
@ -153,8 +117,14 @@ class Processor(ConsumerProducer):
print("Send response...", flush=True) print("Send response...", flush=True)
r = TextCompletionResponse(response=resp, error=None, in_token=inputtokens, out_token=outputtokens, model=self.model) resp = LlmResult(
await self.send(r, properties={"id": id}) text = resp,
in_token = inputtokens,
out_token = outputtokens,
model = self.model
)
return resp
except TooManyRequests: except TooManyRequests:
@ -168,33 +138,14 @@ class Processor(ConsumerProducer):
# Apart from rate limits, treat all exceptions as unrecoverable # Apart from rate limits, treat all exceptions as unrecoverable
print(f"Exception: {e}") print(f"Exception: {e}")
raise e
print("Send error response...", flush=True)
r = TextCompletionResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
in_token=None,
out_token=None,
model=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
print("Done.", flush=True) print("Done.", flush=True)
@staticmethod @staticmethod
def add_args(parser): def add_args(parser):
ConsumerProducer.add_args( LlmService.add_args(parser)
parser, default_input_queue, default_subscriber,
default_output_queue,
)
parser.add_argument( parser.add_argument(
'-e', '--endpoint', '-e', '--endpoint',
@ -224,4 +175,4 @@ class Processor(ConsumerProducer):
def run(): def run():
Processor.launch(module, __doc__) Processor.launch(default_ident, __doc__)

View file

@ -9,18 +9,11 @@ from prometheus_client import Histogram
from openai import AzureOpenAI, RateLimitError from openai import AzureOpenAI, RateLimitError
import os import os
from .... schema import TextCompletionRequest, TextCompletionResponse, Error
from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests from .... exceptions import TooManyRequests
from .... base import LlmService, LlmResult
module = "text-completion" default_ident = "text-completion"
default_input_queue = text_completion_request_queue
default_output_queue = text_completion_response_queue
default_subscriber = module
default_temperature = 0.0 default_temperature = 0.0
default_max_output = 4192 default_max_output = 4192
default_api = "2024-12-01-preview" default_api = "2024-12-01-preview"
@ -28,13 +21,10 @@ default_endpoint = os.getenv("AZURE_ENDPOINT", None)
default_token = os.getenv("AZURE_TOKEN", None) default_token = os.getenv("AZURE_TOKEN", None)
default_model = os.getenv("AZURE_MODEL", None) default_model = os.getenv("AZURE_MODEL", None)
class Processor(ConsumerProducer): class Processor(LlmService):
def __init__(self, **params): def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
temperature = params.get("temperature", default_temperature) temperature = params.get("temperature", default_temperature)
max_output = params.get("max_output", default_max_output) max_output = params.get("max_output", default_max_output)
@ -51,11 +41,6 @@ class Processor(ConsumerProducer):
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": TextCompletionRequest,
"output_schema": TextCompletionResponse,
"temperature": temperature, "temperature": temperature,
"max_output": max_output, "max_output": max_output,
"model": model, "model": model,
@ -63,19 +48,6 @@ class Processor(ConsumerProducer):
} }
) )
if not hasattr(__class__, "text_completion_metric"):
__class__.text_completion_metric = Histogram(
'text_completion_duration',
'Text completion duration (seconds)',
buckets=[
0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0,
8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0,
17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0,
30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0,
120.0
]
)
self.temperature = temperature self.temperature = temperature
self.max_output = max_output self.max_output = max_output
self.model = model self.model = model
@ -84,41 +56,31 @@ class Processor(ConsumerProducer):
api_key=token, api_key=token,
api_version=api, api_version=api,
azure_endpoint = endpoint, azure_endpoint = endpoint,
) )
async def handle(self, msg): async def generate_content(self, system, prompt):
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True)
prompt = v.system + "\n\n" + v.prompt
prompt = system + "\n\n" + prompt
try: try:
with __class__.text_completion_metric.time(): resp = self.openai.chat.completions.create(
resp = self.openai.chat.completions.create( model=self.model,
model=self.model, messages=[
messages=[ {
{ "role": "user",
"role": "user", "content": [
"content": [ {
{ "type": "text",
"type": "text", "text": prompt
"text": prompt }
} ]
] }
} ],
], temperature=self.temperature,
temperature=self.temperature, max_tokens=self.max_output,
max_tokens=self.max_output, top_p=1,
top_p=1, )
)
inputtokens = resp.usage.prompt_tokens inputtokens = resp.usage.prompt_tokens
outputtokens = resp.usage.completion_tokens outputtokens = resp.usage.completion_tokens
@ -127,15 +89,14 @@ class Processor(ConsumerProducer):
print(f"Output Tokens: {outputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True)
print("Send response...", flush=True) print("Send response...", flush=True)
r = TextCompletionResponse( r = LlmResult(
response=resp.choices[0].message.content, text = resp.choices[0].message.content,
error=None, in_token = inputtokens,
in_token=inputtokens, out_token = outputtokens,
out_token=outputtokens, model = self.model
model=self.model
) )
await self.send(r, properties={"id": id}) return r
except RateLimitError: except RateLimitError:
@ -147,35 +108,15 @@ class Processor(ConsumerProducer):
except Exception as e: except Exception as e:
# Apart from rate limits, treat all exceptions as unrecoverable # Apart from rate limits, treat all exceptions as unrecoverable
print(f"Exception: {e}") print(f"Exception: {e}")
raise e
print("Send error response...", flush=True)
r = TextCompletionResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
in_token=None,
out_token=None,
model=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
print("Done.", flush=True) print("Done.", flush=True)
@staticmethod @staticmethod
def add_args(parser): def add_args(parser):
ConsumerProducer.add_args( LlmService.add_args(parser)
parser, default_input_queue, default_subscriber,
default_output_queue,
)
parser.add_argument( parser.add_argument(
'-e', '--endpoint', '-e', '--endpoint',
@ -217,4 +158,4 @@ class Processor(ConsumerProducer):
def run(): def run():
Processor.launch(module, __doc__) Processor.launch(default_ident, __doc__)

View file

@ -5,33 +5,22 @@ Input is prompt, output is response.
""" """
import anthropic import anthropic
from prometheus_client import Histogram
import os import os
from .... schema import TextCompletionRequest, TextCompletionResponse, Error
from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests from .... exceptions import TooManyRequests
from .... base import LlmService, LlmResult
module = "text-completion" default_ident = "text-completion"
default_input_queue = text_completion_request_queue
default_output_queue = text_completion_response_queue
default_subscriber = module
default_model = 'claude-3-5-sonnet-20240620' default_model = 'claude-3-5-sonnet-20240620'
default_temperature = 0.0 default_temperature = 0.0
default_max_output = 8192 default_max_output = 8192
default_api_key = os.getenv("CLAUDE_KEY") default_api_key = os.getenv("CLAUDE_KEY")
class Processor(ConsumerProducer): class Processor(LlmService):
def __init__(self, **params): def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
model = params.get("model", default_model) model = params.get("model", default_model)
api_key = params.get("api_key", default_api_key) api_key = params.get("api_key", default_api_key)
temperature = params.get("temperature", default_temperature) temperature = params.get("temperature", default_temperature)
@ -42,30 +31,12 @@ class Processor(ConsumerProducer):
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": TextCompletionRequest,
"output_schema": TextCompletionResponse,
"model": model, "model": model,
"temperature": temperature, "temperature": temperature,
"max_output": max_output, "max_output": max_output,
} }
) )
if not hasattr(__class__, "text_completion_metric"):
__class__.text_completion_metric = Histogram(
'text_completion_duration',
'Text completion duration (seconds)',
buckets=[
0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0,
8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0,
17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0,
30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0,
120.0
]
)
self.model = model self.model = model
self.claude = anthropic.Anthropic(api_key=api_key) self.claude = anthropic.Anthropic(api_key=api_key)
self.temperature = temperature self.temperature = temperature
@ -73,39 +44,27 @@ class Processor(ConsumerProducer):
print("Initialised", flush=True) print("Initialised", flush=True)
async def handle(self, msg): async def generate_content(self, system, prompt):
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True)
prompt = v.prompt
try: try:
with __class__.text_completion_metric.time(): response = message = self.claude.messages.create(
model=self.model,
response = message = self.claude.messages.create( max_tokens=self.max_output,
model=self.model, temperature=self.temperature,
max_tokens=self.max_output, system = system,
temperature=self.temperature, messages=[
system = v.system, {
messages=[ "role": "user",
{ "content": [
"role": "user", {
"content": [ "type": "text",
{ "text": prompt
"type": "text", }
"text": prompt ]
} }
] ]
} )
]
)
resp = response.content[0].text resp = response.content[0].text
inputtokens = response.usage.input_tokens inputtokens = response.usage.input_tokens
@ -114,17 +73,14 @@ class Processor(ConsumerProducer):
print(f"Input Tokens: {inputtokens}", flush=True) print(f"Input Tokens: {inputtokens}", flush=True)
print(f"Output Tokens: {outputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True)
print("Send response...", flush=True) resp = LlmResult(
r = TextCompletionResponse( text = resp,
response=resp, in_token = inputtokens,
error=None, out_token = outputtokens,
in_token=inputtokens, model = self.model
out_token=outputtokens,
model=self.model
) )
self.send(r, properties={"id": id})
print("Done.", flush=True) return resp
except anthropic.RateLimitError: except anthropic.RateLimitError:
@ -136,31 +92,12 @@ class Processor(ConsumerProducer):
# Apart from rate limits, treat all exceptions as unrecoverable # Apart from rate limits, treat all exceptions as unrecoverable
print(f"Exception: {e}") print(f"Exception: {e}")
raise e
print("Send error response...", flush=True)
r = TextCompletionResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
in_token=None,
out_token=None,
model=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
@staticmethod @staticmethod
def add_args(parser): def add_args(parser):
ConsumerProducer.add_args( LlmService.add_args(parser)
parser, default_input_queue, default_subscriber,
default_output_queue,
)
parser.add_argument( parser.add_argument(
'-m', '--model', '-m', '--model',
@ -190,6 +127,4 @@ class Processor(ConsumerProducer):
def run(): def run():
Processor.launch(module, __doc__) Processor.launch(default_ident, __doc__)

View file

@ -8,29 +8,19 @@ import cohere
from prometheus_client import Histogram from prometheus_client import Histogram
import os import os
from .... schema import TextCompletionRequest, TextCompletionResponse, Error
from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests from .... exceptions import TooManyRequests
from .... base import LlmService, LlmResult
module = "text-completion" default_ident = "text-completion"
default_input_queue = text_completion_request_queue
default_output_queue = text_completion_response_queue
default_subscriber = module
default_model = 'c4ai-aya-23-8b' default_model = 'c4ai-aya-23-8b'
default_temperature = 0.0 default_temperature = 0.0
default_api_key = os.getenv("COHERE_KEY") default_api_key = os.getenv("COHERE_KEY")
class Processor(ConsumerProducer): class Processor(LlmService):
def __init__(self, **params): def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
model = params.get("model", default_model) model = params.get("model", default_model)
api_key = params.get("api_key", default_api_key) api_key = params.get("api_key", default_api_key)
temperature = params.get("temperature", default_temperature) temperature = params.get("temperature", default_temperature)
@ -40,61 +30,30 @@ class Processor(ConsumerProducer):
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": TextCompletionRequest,
"output_schema": TextCompletionResponse,
"model": model, "model": model,
"temperature": temperature, "temperature": temperature,
} }
) )
if not hasattr(__class__, "text_completion_metric"):
__class__.text_completion_metric = Histogram(
'text_completion_duration',
'Text completion duration (seconds)',
buckets=[
0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0,
8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0,
17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0,
30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0,
120.0
]
)
self.model = model self.model = model
self.temperature = temperature self.temperature = temperature
self.cohere = cohere.Client(api_key=api_key) self.cohere = cohere.Client(api_key=api_key)
print("Initialised", flush=True) print("Initialised", flush=True)
async def handle(self, msg): async def generate_content(self, system, prompt):
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True)
system = v.system
prompt = v.prompt
try: try:
with __class__.text_completion_metric.time(): output = self.cohere.chat(
model=self.model,
output = self.cohere.chat( message=prompt,
model=self.model, preamble = system,
message=prompt, temperature=self.temperature,
preamble = system, chat_history=[],
temperature=self.temperature, prompt_truncation='auto',
chat_history=[], connectors=[]
prompt_truncation='auto', )
connectors=[]
)
resp = output.text resp = output.text
inputtokens = int(output.meta.billed_units.input_tokens) inputtokens = int(output.meta.billed_units.input_tokens)
@ -104,11 +63,12 @@ class Processor(ConsumerProducer):
print(f"Input Tokens: {inputtokens}", flush=True) print(f"Input Tokens: {inputtokens}", flush=True)
print(f"Output Tokens: {outputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True)
print("Send response...", flush=True) resp = LlmResult(
r = TextCompletionResponse(response=resp, error=None, in_token=inputtokens, out_token=outputtokens, model=self.model) text = resp,
self.await send(r, properties={"id": id}) in_token = inputtokens,
out_token = outputtokens,
print("Done.", flush=True) model = self.model
)
# FIXME: Wrong exception, don't know what this LLM throws # FIXME: Wrong exception, don't know what this LLM throws
# for a rate limit # for a rate limit
@ -122,31 +82,12 @@ class Processor(ConsumerProducer):
# Apart from rate limits, treat all exceptions as unrecoverable # Apart from rate limits, treat all exceptions as unrecoverable
print(f"Exception: {e}") print(f"Exception: {e}")
raise e
print("Send error response...", flush=True)
r = TextCompletionResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
in_token=None,
out_token=None,
model=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
@staticmethod @staticmethod
def add_args(parser): def add_args(parser):
ConsumerProducer.add_args( LlmService.add_args(parser)
parser, default_input_queue, default_subscriber,
default_output_queue,
)
parser.add_argument( parser.add_argument(
'-m', '--model', '-m', '--model',
@ -169,6 +110,4 @@ class Processor(ConsumerProducer):
def run(): def run():
Processor.launch(module, __doc__) Processor.launch(default_ident, __doc__)

View file

@ -10,30 +10,20 @@ from google.api_core.exceptions import ResourceExhausted
from prometheus_client import Histogram from prometheus_client import Histogram
import os import os
from .... schema import TextCompletionRequest, TextCompletionResponse, Error
from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests from .... exceptions import TooManyRequests
from .... base import LlmService, LlmResult
module = "text-completion" default_ident = "text-completion"
default_input_queue = text_completion_request_queue
default_output_queue = text_completion_response_queue
default_subscriber = module
default_model = 'gemini-1.5-flash-002' default_model = 'gemini-1.5-flash-002'
default_temperature = 0.0 default_temperature = 0.0
default_max_output = 8192 default_max_output = 8192
default_api_key = os.getenv("GOOGLE_AI_STUDIO_KEY") default_api_key = os.getenv("GOOGLE_AI_STUDIO_KEY")
class Processor(ConsumerProducer): class Processor(LlmService):
def __init__(self, **params): def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
model = params.get("model", default_model) model = params.get("model", default_model)
api_key = params.get("api_key", default_api_key) api_key = params.get("api_key", default_api_key)
temperature = params.get("temperature", default_temperature) temperature = params.get("temperature", default_temperature)
@ -44,30 +34,12 @@ class Processor(ConsumerProducer):
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": TextCompletionRequest,
"output_schema": TextCompletionResponse,
"model": model, "model": model,
"temperature": temperature, "temperature": temperature,
"max_output": max_output, "max_output": max_output,
} }
) )
if not hasattr(__class__, "text_completion_metric"):
__class__.text_completion_metric = Histogram(
'text_completion_duration',
'Text completion duration (seconds)',
buckets=[
0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0,
8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0,
17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0,
30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0,
120.0
]
)
genai.configure(api_key=api_key) genai.configure(api_key=api_key)
self.model = model self.model = model
self.temperature = temperature self.temperature = temperature
@ -102,15 +74,7 @@ class Processor(ConsumerProducer):
print("Initialised", flush=True) print("Initialised", flush=True)
async def handle(self, msg): async def generate_content(self, system, prompt):
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True)
# FIXME: There's a system prompt above. Maybe if system changes, # FIXME: There's a system prompt above. Maybe if system changes,
# then reset self.llm? It shouldn't do, because system prompt # then reset self.llm? It shouldn't do, because system prompt
@ -119,17 +83,15 @@ class Processor(ConsumerProducer):
# Or... could keep different LLM structures for different system # Or... could keep different LLM structures for different system
# prompts? # prompts?
prompt = v.system + "\n\n" + v.prompt prompt = system + "\n\n" + prompt
try: try:
with __class__.text_completion_metric.time(): chat_session = self.llm.start_chat(
history=[
chat_session = self.llm.start_chat( ]
history=[ )
] response = chat_session.send_message(prompt)
)
response = chat_session.send_message(prompt)
resp = response.text resp = response.text
inputtokens = int(response.usage_metadata.prompt_token_count) inputtokens = int(response.usage_metadata.prompt_token_count)
@ -138,17 +100,14 @@ class Processor(ConsumerProducer):
print(f"Input Tokens: {inputtokens}", flush=True) print(f"Input Tokens: {inputtokens}", flush=True)
print(f"Output Tokens: {outputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True)
print("Send response...", flush=True) resp = LlmResult(
r = TextCompletionResponse( text = resp,
response=resp, in_token = inputtokens,
error=None, out_token = outputtokens,
in_token=inputtokens, model = self.model
out_token=outputtokens,
model=self.model
) )
await self.send(r, properties={"id": id})
print("Done.", flush=True) return resp
except ResourceExhausted as e: except ResourceExhausted as e:
@ -163,31 +122,12 @@ class Processor(ConsumerProducer):
print(type(e), flush=True) print(type(e), flush=True)
print(f"Exception: {e}", flush=True) print(f"Exception: {e}", flush=True)
raise e
print("Send error response...", flush=True)
r = TextCompletionResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
in_token=None,
out_token=None,
model=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
@staticmethod @staticmethod
def add_args(parser): def add_args(parser):
ConsumerProducer.add_args( LlmService.add_args(parser)
parser, default_input_queue, default_subscriber,
default_output_queue,
)
parser.add_argument( parser.add_argument(
'-m', '--model', '-m', '--model',
@ -217,6 +157,4 @@ class Processor(ConsumerProducer):
def run(): def run():
Processor.launch(module, __doc__) Processor.launch(default_ident, __doc__)

View file

@ -5,32 +5,21 @@ Input is prompt, output is response.
""" """
from openai import OpenAI from openai import OpenAI
from prometheus_client import Histogram
from .... schema import TextCompletionRequest, TextCompletionResponse, Error
from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests from .... exceptions import TooManyRequests
from .... base import LlmService, LlmResult
module = "text-completion" default_ident = "text-completion"
default_input_queue = text_completion_request_queue
default_output_queue = text_completion_response_queue
default_subscriber = module
default_model = 'LLaMA_CPP' default_model = 'LLaMA_CPP'
default_llamafile = os.getenv("LLAMAFILE_URL", "http://localhost:8080/v1") default_llamafile = os.getenv("LLAMAFILE_URL", "http://localhost:8080/v1")
default_temperature = 0.0 default_temperature = 0.0
default_max_output = 4096 default_max_output = 4096
class Processor(ConsumerProducer): class Processor(LlmService):
def __init__(self, **params): def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
model = params.get("model", default_model) model = params.get("model", default_model)
llamafile = params.get("llamafile", default_llamafile) llamafile = params.get("llamafile", default_llamafile)
temperature = params.get("temperature", default_temperature) temperature = params.get("temperature", default_temperature)
@ -38,11 +27,6 @@ class Processor(ConsumerProducer):
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": TextCompletionRequest,
"output_schema": TextCompletionResponse,
"model": model, "model": model,
"temperature": temperature, "temperature": temperature,
"max_output": max_output, "max_output": max_output,
@ -50,19 +34,6 @@ class Processor(ConsumerProducer):
} }
) )
if not hasattr(__class__, "text_completion_metric"):
__class__.text_completion_metric = Histogram(
'text_completion_duration',
'Text completion duration (seconds)',
buckets=[
0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0,
8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0,
17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0,
30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0,
120.0
]
)
self.model = model self.model = model
self.llamafile=llamafile self.llamafile=llamafile
self.temperature = temperature self.temperature = temperature
@ -74,38 +45,26 @@ class Processor(ConsumerProducer):
print("Initialised", flush=True) print("Initialised", flush=True)
async def handle(self, msg): async def generate_content(self, system, prompt):
v = msg.value() prompt = system + "\n\n" + prompt
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True)
prompt = v.system + "\n\n" + v.prompt
try: try:
# FIXME: Rate limits resp = self.openai.chat.completions.create(
model=self.model,
with __class__.text_completion_metric.time(): messages=[
{"role": "user", "content": prompt}
resp = self.openai.chat.completions.create( ]
model=self.model, #temperature=self.temperature,
messages=[ #max_tokens=self.max_output,
{"role": "user", "content": prompt} #top_p=1,
] #frequency_penalty=0,
#temperature=self.temperature, #presence_penalty=0,
#max_tokens=self.max_output, #response_format={
#top_p=1, # "type": "text"
#frequency_penalty=0, #}
#presence_penalty=0, )
#response_format={
# "type": "text"
#}
)
inputtokens = resp.usage.prompt_tokens inputtokens = resp.usage.prompt_tokens
outputtokens = resp.usage.completion_tokens outputtokens = resp.usage.completion_tokens
@ -114,48 +73,26 @@ class Processor(ConsumerProducer):
print(f"Input Tokens: {inputtokens}", flush=True) print(f"Input Tokens: {inputtokens}", flush=True)
print(f"Output Tokens: {outputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True)
print("Send response...", flush=True) resp = LlmResult(
r = TextCompletionResponse( text = resp.choices[0].message.content,
response=resp.choices[0].message.content, in_token = inputtokens,
error=None, out_token = outputtokens,
in_token=inputtokens, model = "llama.cpp",
out_token=outputtokens,
model="llama.cpp"
) )
await self.send(r, properties={"id": id})
print("Done.", flush=True) return resp
# SLM, presumably there aren't rate limits # SLM, presumably there aren't rate limits
except Exception as e: except Exception as e:
print(f"Exception: {e}") print(f"Exception: {e}")
raise e
print("Send error response...", flush=True)
r = TextCompletionResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
in_token=None,
out_token=None,
model=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
@staticmethod @staticmethod
def add_args(parser): def add_args(parser):
ConsumerProducer.add_args( LlmService.add_args(parser)
parser, default_input_queue, default_subscriber,
default_output_queue,
)
parser.add_argument( parser.add_argument(
'-m', '--model', '-m', '--model',
@ -185,6 +122,4 @@ class Processor(ConsumerProducer):
def run(): def run():
Processor.launch(module, __doc__) Processor.launch(default_ident, __doc__)

View file

@ -5,33 +5,23 @@ Input is prompt, output is response.
""" """
from openai import OpenAI from openai import OpenAI
from prometheus_client import Histogram
import os import os
from .... schema import TextCompletionRequest, TextCompletionResponse, Error
from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests from .... exceptions import TooManyRequests
from .... base import LlmService, LlmResult
module = "text-completion" default_ident = "text-completion"
default_input_queue = text_completion_request_queue
default_output_queue = text_completion_response_queue
default_subscriber = module default_subscriber = module
default_model = 'gemma3:9b' default_model = 'gemma3:9b'
default_url = os.getenv("LMSTUDIO_URL", "http://localhost:1234/") default_url = os.getenv("LMSTUDIO_URL", "http://localhost:1234/")
default_temperature = 0.0 default_temperature = 0.0
default_max_output = 4096 default_max_output = 4096
class Processor(ConsumerProducer): class Processor(LlmService):
def __init__(self, **params): def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
model = params.get("model", default_model) model = params.get("model", default_model)
url = params.get("url", default_url) url = params.get("url", default_url)
temperature = params.get("temperature", default_temperature) temperature = params.get("temperature", default_temperature)
@ -39,11 +29,6 @@ class Processor(ConsumerProducer):
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": TextCompletionRequest,
"output_schema": TextCompletionResponse,
"model": model, "model": model,
"temperature": temperature, "temperature": temperature,
"max_output": max_output, "max_output": max_output,
@ -51,19 +36,6 @@ class Processor(ConsumerProducer):
} }
) )
if not hasattr(__class__, "text_completion_metric"):
__class__.text_completion_metric = Histogram(
'text_completion_duration',
'Text completion duration (seconds)',
buckets=[
0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0,
8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0,
17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0,
30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0,
120.0
]
)
self.model = model self.model = model
self.url = url + "v1/" self.url = url + "v1/"
self.temperature = temperature self.temperature = temperature
@ -75,42 +47,30 @@ class Processor(ConsumerProducer):
print("Initialised", flush=True) print("Initialised", flush=True)
async def handle(self, msg): async def generate_content(self, system, prompt):
v = msg.value() prompt = system + "\n\n" + prompt
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True)
prompt = v.system + "\n\n" + v.prompt
try: try:
# FIXME: Rate limits print(prompt)
with __class__.text_completion_metric.time(): resp = self.openai.chat.completions.create(
model=self.model,
messages=[
{"role": "user", "content": prompt}
]
#temperature=self.temperature,
#max_tokens=self.max_output,
#top_p=1,
#frequency_penalty=0,
#presence_penalty=0,
#response_format={
# "type": "text"
#}
)
print(prompt) print(resp)
resp = self.openai.chat.completions.create(
model=self.model,
messages=[
{"role": "user", "content": prompt}
]
#temperature=self.temperature,
#max_tokens=self.max_output,
#top_p=1,
#frequency_penalty=0,
#presence_penalty=0,
#response_format={
# "type": "text"
#}
)
print(resp)
inputtokens = resp.usage.prompt_tokens inputtokens = resp.usage.prompt_tokens
outputtokens = resp.usage.completion_tokens outputtokens = resp.usage.completion_tokens
@ -119,48 +79,26 @@ class Processor(ConsumerProducer):
print(f"Input Tokens: {inputtokens}", flush=True) print(f"Input Tokens: {inputtokens}", flush=True)
print(f"Output Tokens: {outputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True)
print("Send response...", flush=True) resp = LlmResult(
r = TextCompletionResponse( text = resp.choices[0].message.content,
response=resp.choices[0].message.content, in_token = inputtokens,
error=None, out_token = outputtokens,
in_token=inputtokens, model = self.model
out_token=outputtokens,
model=self.model,
) )
await self.send(r, properties={"id": id})
print("Done.", flush=True) return resp
# SLM, presumably there aren't rate limits # SLM, presumably there aren't rate limits
except Exception as e: except Exception as e:
print(f"Exception: {e}") print(f"Exception: {e}")
raise e
print("Send error response...", flush=True)
r = TextCompletionResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
in_token=None,
out_token=None,
model=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
@staticmethod @staticmethod
def add_args(parser): def add_args(parser):
ConsumerProducer.add_args( LlmService.add_args(parser)
parser, default_input_queue, default_subscriber,
default_output_queue,
)
parser.add_argument( parser.add_argument(
'-m', '--model', '-m', '--model',
@ -189,5 +127,5 @@ class Processor(ConsumerProducer):
) )
def run(): def run():
Processor.launch(module, __doc__)
Processor.launch(default_ident, __doc__)

View file

@ -5,33 +5,22 @@ Input is prompt, output is response.
""" """
from mistralai import Mistral from mistralai import Mistral
from prometheus_client import Histogram
import os import os
from .... schema import TextCompletionRequest, TextCompletionResponse, Error
from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests from .... exceptions import TooManyRequests
from .... base import LlmService, LlmResult
module = "text-completion" default_ident = "text-completion"
default_input_queue = text_completion_request_queue
default_output_queue = text_completion_response_queue
default_subscriber = module
default_model = 'ministral-8b-latest' default_model = 'ministral-8b-latest'
default_temperature = 0.0 default_temperature = 0.0
default_max_output = 4096 default_max_output = 4096
default_api_key = os.getenv("MISTRAL_TOKEN") default_api_key = os.getenv("MISTRAL_TOKEN")
class Processor(ConsumerProducer): class Processor(LlmService):
def __init__(self, **params): def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
model = params.get("model", default_model) model = params.get("model", default_model)
api_key = params.get("api_key", default_api_key) api_key = params.get("api_key", default_api_key)
temperature = params.get("temperature", default_temperature) temperature = params.get("temperature", default_temperature)
@ -42,30 +31,12 @@ class Processor(ConsumerProducer):
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": TextCompletionRequest,
"output_schema": TextCompletionResponse,
"model": model, "model": model,
"temperature": temperature, "temperature": temperature,
"max_output": max_output, "max_output": max_output,
} }
) )
if not hasattr(__class__, "text_completion_metric"):
__class__.text_completion_metric = Histogram(
'text_completion_duration',
'Text completion duration (seconds)',
buckets=[
0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0,
8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0,
17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0,
30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0,
120.0
]
)
self.model = model self.model = model
self.temperature = temperature self.temperature = temperature
self.max_output = max_output self.max_output = max_output
@ -73,44 +44,34 @@ class Processor(ConsumerProducer):
print("Initialised", flush=True) print("Initialised", flush=True)
async def handle(self, msg): async def generate_content(self, system, prompt):
v = msg.value() prompt = system + "\n\n" + prompt
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True)
prompt = v.system + "\n\n" + v.prompt
try: try:
with __class__.text_completion_metric.time(): resp = self.mistral.chat.complete(
model=self.model,
resp = self.mistral.chat.complete( messages=[
model=self.model, {
messages=[ "role": "user",
{ "content": [
"role": "user", {
"content": [ "type": "text",
{ "text": prompt
"type": "text", }
"text": prompt ]
}
]
}
],
temperature=self.temperature,
max_tokens=self.max_output,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
response_format={
"type": "text"
} }
) ],
temperature=self.temperature,
max_tokens=self.max_output,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
response_format={
"type": "text"
}
)
inputtokens = resp.usage.prompt_tokens inputtokens = resp.usage.prompt_tokens
outputtokens = resp.usage.completion_tokens outputtokens = resp.usage.completion_tokens
@ -118,17 +79,12 @@ class Processor(ConsumerProducer):
print(f"Input Tokens: {inputtokens}", flush=True) print(f"Input Tokens: {inputtokens}", flush=True)
print(f"Output Tokens: {outputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True)
print("Send response...", flush=True) resp = LlmResult(
r = TextCompletionResponse( text = resp.choices[0].message.content,
response=resp.choices[0].message.content, in_token = inputtokens,
error=None, out_token = outputtokens,
in_token=inputtokens, model = self.model
out_token=outputtokens,
model=self.model
) )
await self.send(r, properties={"id": id})
print("Done.", flush=True)
# FIXME: Wrong exception. The MistralAI library has retry logic # FIXME: Wrong exception. The MistralAI library has retry logic
# so retry-able errors are retried transparently. It means we # so retry-able errors are retried transparently. It means we
@ -148,31 +104,12 @@ class Processor(ConsumerProducer):
# Apart from rate limits, treat all exceptions as unrecoverable # Apart from rate limits, treat all exceptions as unrecoverable
print(f"Exception: {e}") print(f"Exception: {e}")
raise e
print("Send error response...", flush=True)
r = TextCompletionResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
in_token=None,
out_token=None,
model=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
@staticmethod @staticmethod
def add_args(parser): def add_args(parser):
ConsumerProducer.add_args( LlmService.add_args(parser)
parser, default_input_queue, default_subscriber,
default_output_queue,
)
parser.add_argument( parser.add_argument(
'-m', '--model', '-m', '--model',
@ -202,6 +139,4 @@ class Processor(ConsumerProducer):
def run(): def run():
Processor.launch(module, __doc__) Processor.launch(default_ident, __doc__)

View file

@ -5,87 +5,40 @@ Input is prompt, output is response.
""" """
from ollama import Client from ollama import Client
from prometheus_client import Histogram, Info
import os import os
from .... schema import TextCompletionRequest, TextCompletionResponse, Error
from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests from .... exceptions import TooManyRequests
from .... base import LlmService, LlmResult
module = "text-completion" default_ident = "text-completion"
default_input_queue = text_completion_request_queue
default_output_queue = text_completion_response_queue
default_subscriber = module
default_model = 'gemma2:9b' default_model = 'gemma2:9b'
default_ollama = os.getenv("OLLAMA_HOST", 'http://localhost:11434') default_ollama = os.getenv("OLLAMA_HOST", 'http://localhost:11434')
class Processor(ConsumerProducer): class Processor(LlmService):
def __init__(self, **params): def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
model = params.get("model", default_model) model = params.get("model", default_model)
ollama = params.get("ollama", default_ollama) ollama = params.get("ollama", default_ollama)
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"model": model, "model": model,
"ollama": ollama, "ollama": ollama,
"input_schema": TextCompletionRequest,
"output_schema": TextCompletionResponse,
} }
) )
if not hasattr(__class__, "text_completion_metric"):
__class__.text_completion_metric = Histogram(
'text_completion_duration',
'Text completion duration (seconds)',
buckets=[
0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0,
8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0,
17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0,
30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0,
120.0
]
)
if not hasattr(__class__, "model_metric"):
__class__.model_metric = Info(
'model', 'Model information'
)
__class__.model_metric.info({
"model": model,
"ollama": ollama,
})
self.model = model self.model = model
self.llm = Client(host=ollama) self.llm = Client(host=ollama)
async def handle(self, msg): async def generate_content(self, system, prompt):
v = msg.value() prompt = system + "\n\n" + prompt
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True)
prompt = v.system + "\n\n" + v.prompt
try: try:
with __class__.text_completion_metric.time(): response = self.llm.generate(self.model, prompt)
response = self.llm.generate(self.model, prompt)
response_text = response['response'] response_text = response['response']
print("Send response...", flush=True) print("Send response...", flush=True)
@ -94,42 +47,26 @@ class Processor(ConsumerProducer):
inputtokens = int(response['prompt_eval_count']) inputtokens = int(response['prompt_eval_count'])
outputtokens = int(response['eval_count']) outputtokens = int(response['eval_count'])
r = TextCompletionResponse(response=response_text, error=None, in_token=inputtokens, out_token=outputtokens, model="ollama") resp = LlmResult(
text = response_text,
in_token = inputtokens,
out_token = outputtokens,
model = self.model
)
await self.send(r, properties={"id": id}) return resp
print("Done.", flush=True)
# SLM, presumably no rate limits # SLM, presumably no rate limits
except Exception as e: except Exception as e:
print(f"Exception: {e}") print(f"Exception: {e}")
raise e
print("Send error response...", flush=True)
r = TextCompletionResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
in_token=None,
out_token=None,
model=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
@staticmethod @staticmethod
def add_args(parser): def add_args(parser):
ConsumerProducer.add_args( LlmService.add_args(parser)
parser, default_input_queue, default_subscriber,
default_output_queue,
)
parser.add_argument( parser.add_argument(
'-m', '--model', '-m', '--model',
@ -145,6 +82,4 @@ class Processor(ConsumerProducer):
def run(): def run():
Processor.launch(module, __doc__) Processor.launch(default_ident, __doc__)

View file

@ -5,20 +5,13 @@ Input is prompt, output is response.
""" """
from openai import OpenAI, RateLimitError from openai import OpenAI, RateLimitError
from prometheus_client import Histogram
import os import os
from .... schema import TextCompletionRequest, TextCompletionResponse, Error
from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests from .... exceptions import TooManyRequests
from .... base import LlmService, LlmResult
module = "text-completion" default_ident = "text-completion"
default_input_queue = text_completion_request_queue
default_output_queue = text_completion_response_queue
default_subscriber = module default_subscriber = module
default_model = 'gpt-3.5-turbo' default_model = 'gpt-3.5-turbo'
default_temperature = 0.0 default_temperature = 0.0
@ -26,13 +19,10 @@ default_max_output = 4096
default_api_key = os.getenv("OPENAI_TOKEN") default_api_key = os.getenv("OPENAI_TOKEN")
default_base_url = os.getenv("OPENAI_BASE_URL", None) default_base_url = os.getenv("OPENAI_BASE_URL", None)
class Processor(ConsumerProducer): class Processor(LlmService):
def __init__(self, **params): def __init__(self, **params):
input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)
model = params.get("model", default_model) model = params.get("model", default_model)
api_key = params.get("api_key", default_api_key) api_key = params.get("api_key", default_api_key)
base_url = params.get("base_url", default_base_url) base_url = params.get("base_url", default_base_url)
@ -44,11 +34,6 @@ class Processor(ConsumerProducer):
super(Processor, self).__init__( super(Processor, self).__init__(
**params | { **params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": TextCompletionRequest,
"output_schema": TextCompletionResponse,
"model": model, "model": model,
"temperature": temperature, "temperature": temperature,
"max_output": max_output, "max_output": max_output,
@ -56,19 +41,6 @@ class Processor(ConsumerProducer):
} }
) )
if not hasattr(__class__, "text_completion_metric"):
__class__.text_completion_metric = Histogram(
'text_completion_duration',
'Text completion duration (seconds)',
buckets=[
0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0,
8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0,
17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0,
30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0,
120.0
]
)
self.model = model self.model = model
self.temperature = temperature self.temperature = temperature
self.max_output = max_output self.max_output = max_output
@ -76,44 +48,34 @@ class Processor(ConsumerProducer):
print("Initialised", flush=True) print("Initialised", flush=True)
async def handle(self, msg): async def generate_content(self, system, prompt):
v = msg.value() prompt = system + "\n\n" + prompt
# Sender-produced ID
id = msg.properties()["id"]
print(f"Handling prompt {id}...", flush=True)
prompt = v.system + "\n\n" + v.prompt
try: try:
with __class__.text_completion_metric.time(): resp = self.openai.chat.completions.create(
model=self.model,
resp = self.openai.chat.completions.create( messages=[
model=self.model, {
messages=[ "role": "user",
{ "content": [
"role": "user", {
"content": [ "type": "text",
{ "text": prompt
"type": "text", }
"text": prompt ]
}
]
}
],
temperature=self.temperature,
max_tokens=self.max_output,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
response_format={
"type": "text"
} }
) ],
temperature=self.temperature,
max_tokens=self.max_output,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
response_format={
"type": "text"
}
)
inputtokens = resp.usage.prompt_tokens inputtokens = resp.usage.prompt_tokens
outputtokens = resp.usage.completion_tokens outputtokens = resp.usage.completion_tokens
@ -121,17 +83,14 @@ class Processor(ConsumerProducer):
print(f"Input Tokens: {inputtokens}", flush=True) print(f"Input Tokens: {inputtokens}", flush=True)
print(f"Output Tokens: {outputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True)
print("Send response...", flush=True) resp = LlmResult(
r = TextCompletionResponse( text = resp.choices[0].message.content,
response=resp.choices[0].message.content, in_token = inputtokens,
error=None, out_token = outputtokens,
in_token=inputtokens, model = self.model
out_token=outputtokens,
model=self.model
) )
await self.send(r, properties={"id": id})
print("Done.", flush=True) return resp
# FIXME: Wrong exception, don't know what this LLM throws # FIXME: Wrong exception, don't know what this LLM throws
# for a rate limit # for a rate limit
@ -145,31 +104,12 @@ class Processor(ConsumerProducer):
# Apart from rate limits, treat all exceptions as unrecoverable # Apart from rate limits, treat all exceptions as unrecoverable
print(f"Exception: {e}") print(f"Exception: {e}")
raise e
print("Send error response...", flush=True)
r = TextCompletionResponse(
error=Error(
type = "llm-error",
message = str(e),
),
response=None,
in_token=None,
out_token=None,
model=None,
)
await self.send(r, properties={"id": id})
self.consumer.acknowledge(msg)
@staticmethod @staticmethod
def add_args(parser): def add_args(parser):
ConsumerProducer.add_args( LlmService.add_args(parser)
parser, default_input_queue, default_subscriber,
default_output_queue,
)
parser.add_argument( parser.add_argument(
'-m', '--model', '-m', '--model',
@ -205,6 +145,4 @@ class Processor(ConsumerProducer):
def run(): def run():
Processor.launch(module, __doc__) Processor.launch(default_ident, __doc__)

View file

@ -105,11 +105,12 @@ class Processor(LlmService):
safety_settings=self.safety_settings safety_settings=self.safety_settings
) )
resp = LlmResult() resp = LlmResult(
resp.text = response.text text = response.text,
resp.in_token = response.usage_metadata.prompt_token_count in_token = response.usage_metadata.prompt_token_count,
resp.out_token = response.usage_metadata.candidates_token_count out_token = response.usage_metadata.candidates_token_count,
resp.model = self.model model = self.model
)
print(f"Input Tokens: {resp.in_token}", flush=True) print(f"Input Tokens: {resp.in_token}", flush=True)
print(f"Output Tokens: {resp.out_token}", flush=True) print(f"Output Tokens: {resp.out_token}", flush=True)