diff --git a/trustgraph-base/trustgraph/base/llm_service.py b/trustgraph-base/trustgraph/base/llm_service.py index 39323db7..c79b819b 100644 --- a/trustgraph-base/trustgraph/base/llm_service.py +++ b/trustgraph-base/trustgraph/base/llm_service.py @@ -13,6 +13,11 @@ from .. base import FlowProcessor, ConsumerSpec, ProducerSpec default_ident = "text-completion" class LlmResult: + def __init__(self, text=None, in_token=None, out_token=None, model=None): + self.text = text + self.in_token = in_token + self.out_token = out_token + self.model = model __slots__ = ["text", "in_token", "out_token", "model"] class LlmService(FlowProcessor): diff --git a/trustgraph-bedrock/trustgraph/model/text_completion/bedrock/llm.py b/trustgraph-bedrock/trustgraph/model/text_completion/bedrock/llm.py index 572e01b7..156030d0 100755 --- a/trustgraph-bedrock/trustgraph/model/text_completion/bedrock/llm.py +++ b/trustgraph-bedrock/trustgraph/model/text_completion/bedrock/llm.py @@ -6,22 +6,14 @@ Input is prompt, output is response. Mistral is default. import boto3 import json -from prometheus_client import Histogram import os import enum -from .... schema import TextCompletionRequest, TextCompletionResponse, Error -from .... schema import text_completion_request_queue -from .... schema import text_completion_response_queue -from .... log_level import LogLevel -from .... base import ConsumerProducer from .... exceptions import TooManyRequests +from .... base import LlmService, LlmResult -module = "text-completion" +default_ident = "text-completion" -default_input_queue = text_completion_request_queue -default_output_queue = text_completion_response_queue -default_subscriber = module default_model = 'mistral.mistral-large-2407-v1:0' default_temperature = 0.0 default_max_output = 2048 @@ -149,16 +141,12 @@ class Cohere(ModelHandler): Default=Mistral -class Processor(ConsumerProducer): +class Processor(LlmService): def __init__(self, **params): print(params) - input_queue = params.get("input_queue", default_input_queue) - output_queue = params.get("output_queue", default_output_queue) - subscriber = params.get("subscriber", default_subscriber) - model = params.get("model", default_model) temperature = params.get("temperature", default_temperature) max_output = params.get("max_output", default_max_output) @@ -185,30 +173,12 @@ class Processor(ConsumerProducer): super(Processor, self).__init__( **params | { - "input_queue": input_queue, - "output_queue": output_queue, - "subscriber": subscriber, - "input_schema": TextCompletionRequest, - "output_schema": TextCompletionResponse, "model": model, "temperature": temperature, "max_output": max_output, } ) - if not hasattr(__class__, "text_completion_metric"): - __class__.text_completion_metric = Histogram( - 'text_completion_duration', - 'Text completion duration (seconds)', - buckets=[ - 0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, - 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, - 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, - 30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0, - 120.0 - ] - ) - self.model = model self.temperature = temperature self.max_output = max_output @@ -257,30 +227,21 @@ class Processor(ConsumerProducer): return Default - async def handle(self, msg): - - v = msg.value() - - # Sender-produced ID - - id = msg.properties()["id"] - - print(f"Handling prompt {id}...", flush=True) + async def generate_content(self, system, prompt): try: - promptbody = self.variant.encode_request(v.system, v.prompt) + promptbody = self.variant.encode_request(system, prompt) accept = 'application/json' contentType = 'application/json' - with __class__.text_completion_metric.time(): - response = self.bedrock.invoke_model( - body=promptbody, - modelId=self.model, - accept=accept, - contentType=contentType - ) + response = self.bedrock.invoke_model( + body=promptbody, + modelId=self.model, + accept=accept, + contentType=contentType + ) # Response structure decode outputtext = self.variant.decode_response(response) @@ -293,18 +254,14 @@ class Processor(ConsumerProducer): print(f"Input Tokens: {inputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True) - print("Send response...", flush=True) - r = TextCompletionResponse( - error=None, - response=outputtext, - in_token=inputtokens, - out_token=outputtokens, - model=str(self.model), + resp = LlmResult( + text = outputtext, + in_token = inputtokens, + out_token = outputtokens, + model = self.model ) - await self.send(r, properties={"id": id}) - - print("Done.", flush=True) + return resp except self.bedrock.exceptions.ThrottlingException as e: @@ -319,31 +276,12 @@ class Processor(ConsumerProducer): print(type(e)) print(f"Exception: {e}") - - print("Send error response...", flush=True) - - r = TextCompletionResponse( - error=Error( - type = "llm-error", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - await self.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + raise e @staticmethod def add_args(parser): - ConsumerProducer.add_args( - parser, default_input_queue, default_subscriber, - default_output_queue, - ) + LlmService.add_args(parser) parser.add_argument( '-m', '--model', @@ -391,5 +329,4 @@ class Processor(ConsumerProducer): def run(): - Processor.launch(module, __doc__) - + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/azure/llm.py b/trustgraph-flow/trustgraph/model/text_completion/azure/llm.py index 79118cc8..70b07606 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/azure/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/azure/llm.py @@ -9,31 +9,21 @@ import json from prometheus_client import Histogram import os -from .... schema import TextCompletionRequest, TextCompletionResponse, Error -from .... schema import text_completion_request_queue -from .... schema import text_completion_response_queue -from .... log_level import LogLevel -from .... base import ConsumerProducer from .... exceptions import TooManyRequests +from .... base import LlmService, LlmResult -module = "text-completion" +default_ident = "text-completion" -default_input_queue = text_completion_request_queue -default_output_queue = text_completion_response_queue -default_subscriber = module default_temperature = 0.0 default_max_output = 4192 default_model = "AzureAI" default_endpoint = os.getenv("AZURE_ENDPOINT") default_token = os.getenv("AZURE_TOKEN") -class Processor(ConsumerProducer): +class Processor(LlmService): def __init__(self, **params): - input_queue = params.get("input_queue", default_input_queue) - output_queue = params.get("output_queue", default_output_queue) - subscriber = params.get("subscriber", default_subscriber) endpoint = params.get("endpoint", default_endpoint) token = params.get("token", default_token) temperature = params.get("temperature", default_temperature) @@ -48,30 +38,13 @@ class Processor(ConsumerProducer): super(Processor, self).__init__( **params | { - "input_queue": input_queue, - "output_queue": output_queue, - "subscriber": subscriber, - "input_schema": TextCompletionRequest, - "output_schema": TextCompletionResponse, + "endpoint": endpoint, "temperature": temperature, "max_output": max_output, "model": model, } ) - if not hasattr(__class__, "text_completion_metric"): - __class__.text_completion_metric = Histogram( - 'text_completion_duration', - 'Text completion duration (seconds)', - buckets=[ - 0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, - 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, - 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, - 30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0, - 120.0 - ] - ) - self.endpoint = endpoint self.token = token self.temperature = temperature @@ -123,25 +96,16 @@ class Processor(ConsumerProducer): return result - async def handle(self, msg): - - v = msg.value() - - # Sender-produced ID - - id = msg.properties()["id"] - - print(f"Handling prompt {id}...", flush=True) + async def generate_content(self, system, prompt): try: prompt = self.build_prompt( - v.system, - v.prompt + system, + prompt ) - with __class__.text_completion_metric.time(): - response = self.call_llm(prompt) + response = self.call_llm(prompt) resp = response['choices'][0]['message']['content'] inputtokens = response['usage']['prompt_tokens'] @@ -153,8 +117,14 @@ class Processor(ConsumerProducer): print("Send response...", flush=True) - r = TextCompletionResponse(response=resp, error=None, in_token=inputtokens, out_token=outputtokens, model=self.model) - await self.send(r, properties={"id": id}) + resp = LlmResult( + text = resp, + in_token = inputtokens, + out_token = outputtokens, + model = self.model + ) + + return resp except TooManyRequests: @@ -168,33 +138,14 @@ class Processor(ConsumerProducer): # Apart from rate limits, treat all exceptions as unrecoverable print(f"Exception: {e}") - - print("Send error response...", flush=True) - - r = TextCompletionResponse( - error=Error( - type = "llm-error", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - await self.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + raise e print("Done.", flush=True) @staticmethod def add_args(parser): - ConsumerProducer.add_args( - parser, default_input_queue, default_subscriber, - default_output_queue, - ) + LlmService.add_args(parser) parser.add_argument( '-e', '--endpoint', @@ -224,4 +175,4 @@ class Processor(ConsumerProducer): def run(): - Processor.launch(module, __doc__) + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/azure_openai/llm.py b/trustgraph-flow/trustgraph/model/text_completion/azure_openai/llm.py index 734b20c5..c5dd097c 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/azure_openai/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/azure_openai/llm.py @@ -9,18 +9,11 @@ from prometheus_client import Histogram from openai import AzureOpenAI, RateLimitError import os -from .... schema import TextCompletionRequest, TextCompletionResponse, Error -from .... schema import text_completion_request_queue -from .... schema import text_completion_response_queue -from .... log_level import LogLevel -from .... base import ConsumerProducer from .... exceptions import TooManyRequests +from .... base import LlmService, LlmResult -module = "text-completion" +default_ident = "text-completion" -default_input_queue = text_completion_request_queue -default_output_queue = text_completion_response_queue -default_subscriber = module default_temperature = 0.0 default_max_output = 4192 default_api = "2024-12-01-preview" @@ -28,13 +21,10 @@ default_endpoint = os.getenv("AZURE_ENDPOINT", None) default_token = os.getenv("AZURE_TOKEN", None) default_model = os.getenv("AZURE_MODEL", None) -class Processor(ConsumerProducer): +class Processor(LlmService): def __init__(self, **params): - input_queue = params.get("input_queue", default_input_queue) - output_queue = params.get("output_queue", default_output_queue) - subscriber = params.get("subscriber", default_subscriber) temperature = params.get("temperature", default_temperature) max_output = params.get("max_output", default_max_output) @@ -51,11 +41,6 @@ class Processor(ConsumerProducer): super(Processor, self).__init__( **params | { - "input_queue": input_queue, - "output_queue": output_queue, - "subscriber": subscriber, - "input_schema": TextCompletionRequest, - "output_schema": TextCompletionResponse, "temperature": temperature, "max_output": max_output, "model": model, @@ -63,19 +48,6 @@ class Processor(ConsumerProducer): } ) - if not hasattr(__class__, "text_completion_metric"): - __class__.text_completion_metric = Histogram( - 'text_completion_duration', - 'Text completion duration (seconds)', - buckets=[ - 0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, - 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, - 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, - 30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0, - 120.0 - ] - ) - self.temperature = temperature self.max_output = max_output self.model = model @@ -84,41 +56,31 @@ class Processor(ConsumerProducer): api_key=token, api_version=api, azure_endpoint = endpoint, - ) + ) - async def handle(self, msg): - - v = msg.value() - - # Sender-produced ID - - id = msg.properties()["id"] - - print(f"Handling prompt {id}...", flush=True) - - prompt = v.system + "\n\n" + v.prompt + async def generate_content(self, system, prompt): + prompt = system + "\n\n" + prompt try: - with __class__.text_completion_metric.time(): - resp = self.openai.chat.completions.create( - model=self.model, - messages=[ - { - "role": "user", - "content": [ - { - "type": "text", - "text": prompt - } - ] - } - ], - temperature=self.temperature, - max_tokens=self.max_output, - top_p=1, - ) + resp = self.openai.chat.completions.create( + model=self.model, + messages=[ + { + "role": "user", + "content": [ + { + "type": "text", + "text": prompt + } + ] + } + ], + temperature=self.temperature, + max_tokens=self.max_output, + top_p=1, + ) inputtokens = resp.usage.prompt_tokens outputtokens = resp.usage.completion_tokens @@ -127,15 +89,14 @@ class Processor(ConsumerProducer): print(f"Output Tokens: {outputtokens}", flush=True) print("Send response...", flush=True) - r = TextCompletionResponse( - response=resp.choices[0].message.content, - error=None, - in_token=inputtokens, - out_token=outputtokens, - model=self.model + r = LlmResult( + text = resp.choices[0].message.content, + in_token = inputtokens, + out_token = outputtokens, + model = self.model ) - await self.send(r, properties={"id": id}) + return r except RateLimitError: @@ -147,35 +108,15 @@ class Processor(ConsumerProducer): except Exception as e: # Apart from rate limits, treat all exceptions as unrecoverable - print(f"Exception: {e}") - - print("Send error response...", flush=True) - - r = TextCompletionResponse( - error=Error( - type = "llm-error", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - await self.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + raise e print("Done.", flush=True) @staticmethod def add_args(parser): - ConsumerProducer.add_args( - parser, default_input_queue, default_subscriber, - default_output_queue, - ) + LlmService.add_args(parser) parser.add_argument( '-e', '--endpoint', @@ -217,4 +158,4 @@ class Processor(ConsumerProducer): def run(): - Processor.launch(module, __doc__) + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/claude/llm.py b/trustgraph-flow/trustgraph/model/text_completion/claude/llm.py index f60b70d7..e69c2095 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/claude/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/claude/llm.py @@ -5,33 +5,22 @@ Input is prompt, output is response. """ import anthropic -from prometheus_client import Histogram import os -from .... schema import TextCompletionRequest, TextCompletionResponse, Error -from .... schema import text_completion_request_queue -from .... schema import text_completion_response_queue -from .... log_level import LogLevel -from .... base import ConsumerProducer from .... exceptions import TooManyRequests +from .... base import LlmService, LlmResult -module = "text-completion" +default_ident = "text-completion" -default_input_queue = text_completion_request_queue -default_output_queue = text_completion_response_queue -default_subscriber = module default_model = 'claude-3-5-sonnet-20240620' default_temperature = 0.0 default_max_output = 8192 default_api_key = os.getenv("CLAUDE_KEY") -class Processor(ConsumerProducer): +class Processor(LlmService): def __init__(self, **params): - input_queue = params.get("input_queue", default_input_queue) - output_queue = params.get("output_queue", default_output_queue) - subscriber = params.get("subscriber", default_subscriber) model = params.get("model", default_model) api_key = params.get("api_key", default_api_key) temperature = params.get("temperature", default_temperature) @@ -42,30 +31,12 @@ class Processor(ConsumerProducer): super(Processor, self).__init__( **params | { - "input_queue": input_queue, - "output_queue": output_queue, - "subscriber": subscriber, - "input_schema": TextCompletionRequest, - "output_schema": TextCompletionResponse, "model": model, "temperature": temperature, "max_output": max_output, } ) - if not hasattr(__class__, "text_completion_metric"): - __class__.text_completion_metric = Histogram( - 'text_completion_duration', - 'Text completion duration (seconds)', - buckets=[ - 0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, - 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, - 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, - 30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0, - 120.0 - ] - ) - self.model = model self.claude = anthropic.Anthropic(api_key=api_key) self.temperature = temperature @@ -73,39 +44,27 @@ class Processor(ConsumerProducer): print("Initialised", flush=True) - async def handle(self, msg): - - v = msg.value() - - # Sender-produced ID - - id = msg.properties()["id"] - - print(f"Handling prompt {id}...", flush=True) - - prompt = v.prompt + async def generate_content(self, system, prompt): try: - with __class__.text_completion_metric.time(): - - response = message = self.claude.messages.create( - model=self.model, - max_tokens=self.max_output, - temperature=self.temperature, - system = v.system, - messages=[ - { - "role": "user", - "content": [ - { - "type": "text", - "text": prompt - } - ] - } - ] - ) + response = message = self.claude.messages.create( + model=self.model, + max_tokens=self.max_output, + temperature=self.temperature, + system = system, + messages=[ + { + "role": "user", + "content": [ + { + "type": "text", + "text": prompt + } + ] + } + ] + ) resp = response.content[0].text inputtokens = response.usage.input_tokens @@ -114,17 +73,14 @@ class Processor(ConsumerProducer): print(f"Input Tokens: {inputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True) - print("Send response...", flush=True) - r = TextCompletionResponse( - response=resp, - error=None, - in_token=inputtokens, - out_token=outputtokens, - model=self.model + resp = LlmResult( + text = resp, + in_token = inputtokens, + out_token = outputtokens, + model = self.model ) - self.send(r, properties={"id": id}) - print("Done.", flush=True) + return resp except anthropic.RateLimitError: @@ -136,31 +92,12 @@ class Processor(ConsumerProducer): # Apart from rate limits, treat all exceptions as unrecoverable print(f"Exception: {e}") - - print("Send error response...", flush=True) - - r = TextCompletionResponse( - error=Error( - type = "llm-error", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - await self.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + raise e @staticmethod def add_args(parser): - ConsumerProducer.add_args( - parser, default_input_queue, default_subscriber, - default_output_queue, - ) + LlmService.add_args(parser) parser.add_argument( '-m', '--model', @@ -189,7 +126,5 @@ class Processor(ConsumerProducer): ) def run(): - - Processor.launch(module, __doc__) - + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/cohere/llm.py b/trustgraph-flow/trustgraph/model/text_completion/cohere/llm.py index df104ada..d6b2b971 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/cohere/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/cohere/llm.py @@ -8,29 +8,19 @@ import cohere from prometheus_client import Histogram import os -from .... schema import TextCompletionRequest, TextCompletionResponse, Error -from .... schema import text_completion_request_queue -from .... schema import text_completion_response_queue -from .... log_level import LogLevel -from .... base import ConsumerProducer from .... exceptions import TooManyRequests +from .... base import LlmService, LlmResult -module = "text-completion" +default_ident = "text-completion" -default_input_queue = text_completion_request_queue -default_output_queue = text_completion_response_queue -default_subscriber = module default_model = 'c4ai-aya-23-8b' default_temperature = 0.0 default_api_key = os.getenv("COHERE_KEY") -class Processor(ConsumerProducer): +class Processor(LlmService): def __init__(self, **params): - input_queue = params.get("input_queue", default_input_queue) - output_queue = params.get("output_queue", default_output_queue) - subscriber = params.get("subscriber", default_subscriber) model = params.get("model", default_model) api_key = params.get("api_key", default_api_key) temperature = params.get("temperature", default_temperature) @@ -40,61 +30,30 @@ class Processor(ConsumerProducer): super(Processor, self).__init__( **params | { - "input_queue": input_queue, - "output_queue": output_queue, - "subscriber": subscriber, - "input_schema": TextCompletionRequest, - "output_schema": TextCompletionResponse, "model": model, "temperature": temperature, } ) - if not hasattr(__class__, "text_completion_metric"): - __class__.text_completion_metric = Histogram( - 'text_completion_duration', - 'Text completion duration (seconds)', - buckets=[ - 0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, - 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, - 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, - 30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0, - 120.0 - ] - ) - self.model = model self.temperature = temperature self.cohere = cohere.Client(api_key=api_key) print("Initialised", flush=True) - async def handle(self, msg): - - v = msg.value() - - # Sender-produced ID - - id = msg.properties()["id"] - - print(f"Handling prompt {id}...", flush=True) - - system = v.system - prompt = v.prompt + async def generate_content(self, system, prompt): try: - with __class__.text_completion_metric.time(): - - output = self.cohere.chat( - model=self.model, - message=prompt, - preamble = system, - temperature=self.temperature, - chat_history=[], - prompt_truncation='auto', - connectors=[] - ) + output = self.cohere.chat( + model=self.model, + message=prompt, + preamble = system, + temperature=self.temperature, + chat_history=[], + prompt_truncation='auto', + connectors=[] + ) resp = output.text inputtokens = int(output.meta.billed_units.input_tokens) @@ -104,11 +63,12 @@ class Processor(ConsumerProducer): print(f"Input Tokens: {inputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True) - print("Send response...", flush=True) - r = TextCompletionResponse(response=resp, error=None, in_token=inputtokens, out_token=outputtokens, model=self.model) - self.await send(r, properties={"id": id}) - - print("Done.", flush=True) + resp = LlmResult( + text = resp, + in_token = inputtokens, + out_token = outputtokens, + model = self.model + ) # FIXME: Wrong exception, don't know what this LLM throws # for a rate limit @@ -122,31 +82,12 @@ class Processor(ConsumerProducer): # Apart from rate limits, treat all exceptions as unrecoverable print(f"Exception: {e}") - - print("Send error response...", flush=True) - - r = TextCompletionResponse( - error=Error( - type = "llm-error", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - await self.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + raise e @staticmethod def add_args(parser): - ConsumerProducer.add_args( - parser, default_input_queue, default_subscriber, - default_output_queue, - ) + LlmService.add_args(parser) parser.add_argument( '-m', '--model', @@ -168,7 +109,5 @@ class Processor(ConsumerProducer): ) def run(): - - Processor.launch(module, __doc__) - + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/googleaistudio/llm.py b/trustgraph-flow/trustgraph/model/text_completion/googleaistudio/llm.py index 9f382572..051e2fe5 100644 --- a/trustgraph-flow/trustgraph/model/text_completion/googleaistudio/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/googleaistudio/llm.py @@ -10,30 +10,20 @@ from google.api_core.exceptions import ResourceExhausted from prometheus_client import Histogram import os -from .... schema import TextCompletionRequest, TextCompletionResponse, Error -from .... schema import text_completion_request_queue -from .... schema import text_completion_response_queue -from .... log_level import LogLevel -from .... base import ConsumerProducer from .... exceptions import TooManyRequests +from .... base import LlmService, LlmResult -module = "text-completion" +default_ident = "text-completion" -default_input_queue = text_completion_request_queue -default_output_queue = text_completion_response_queue -default_subscriber = module default_model = 'gemini-1.5-flash-002' default_temperature = 0.0 default_max_output = 8192 default_api_key = os.getenv("GOOGLE_AI_STUDIO_KEY") -class Processor(ConsumerProducer): +class Processor(LlmService): def __init__(self, **params): - input_queue = params.get("input_queue", default_input_queue) - output_queue = params.get("output_queue", default_output_queue) - subscriber = params.get("subscriber", default_subscriber) model = params.get("model", default_model) api_key = params.get("api_key", default_api_key) temperature = params.get("temperature", default_temperature) @@ -44,30 +34,12 @@ class Processor(ConsumerProducer): super(Processor, self).__init__( **params | { - "input_queue": input_queue, - "output_queue": output_queue, - "subscriber": subscriber, - "input_schema": TextCompletionRequest, - "output_schema": TextCompletionResponse, "model": model, "temperature": temperature, "max_output": max_output, } ) - if not hasattr(__class__, "text_completion_metric"): - __class__.text_completion_metric = Histogram( - 'text_completion_duration', - 'Text completion duration (seconds)', - buckets=[ - 0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, - 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, - 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, - 30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0, - 120.0 - ] - ) - genai.configure(api_key=api_key) self.model = model self.temperature = temperature @@ -102,15 +74,7 @@ class Processor(ConsumerProducer): print("Initialised", flush=True) - async def handle(self, msg): - - v = msg.value() - - # Sender-produced ID - - id = msg.properties()["id"] - - print(f"Handling prompt {id}...", flush=True) + async def generate_content(self, system, prompt): # FIXME: There's a system prompt above. Maybe if system changes, # then reset self.llm? It shouldn't do, because system prompt @@ -119,17 +83,15 @@ class Processor(ConsumerProducer): # Or... could keep different LLM structures for different system # prompts? - prompt = v.system + "\n\n" + v.prompt + prompt = system + "\n\n" + prompt try: - with __class__.text_completion_metric.time(): - - chat_session = self.llm.start_chat( - history=[ - ] - ) - response = chat_session.send_message(prompt) + chat_session = self.llm.start_chat( + history=[ + ] + ) + response = chat_session.send_message(prompt) resp = response.text inputtokens = int(response.usage_metadata.prompt_token_count) @@ -138,17 +100,14 @@ class Processor(ConsumerProducer): print(f"Input Tokens: {inputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True) - print("Send response...", flush=True) - r = TextCompletionResponse( - response=resp, - error=None, - in_token=inputtokens, - out_token=outputtokens, - model=self.model + resp = LlmResult( + text = resp, + in_token = inputtokens, + out_token = outputtokens, + model = self.model ) - await self.send(r, properties={"id": id}) - print("Done.", flush=True) + return resp except ResourceExhausted as e: @@ -163,31 +122,12 @@ class Processor(ConsumerProducer): print(type(e), flush=True) print(f"Exception: {e}", flush=True) - - print("Send error response...", flush=True) - - r = TextCompletionResponse( - error=Error( - type = "llm-error", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - await self.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + raise e @staticmethod def add_args(parser): - ConsumerProducer.add_args( - parser, default_input_queue, default_subscriber, - default_output_queue, - ) + LlmService.add_args(parser) parser.add_argument( '-m', '--model', @@ -216,7 +156,5 @@ class Processor(ConsumerProducer): ) def run(): - - Processor.launch(module, __doc__) - + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/llamafile/llm.py b/trustgraph-flow/trustgraph/model/text_completion/llamafile/llm.py index fd473564..76300c5a 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/llamafile/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/llamafile/llm.py @@ -5,32 +5,21 @@ Input is prompt, output is response. """ from openai import OpenAI -from prometheus_client import Histogram -from .... schema import TextCompletionRequest, TextCompletionResponse, Error -from .... schema import text_completion_request_queue -from .... schema import text_completion_response_queue -from .... log_level import LogLevel -from .... base import ConsumerProducer from .... exceptions import TooManyRequests +from .... base import LlmService, LlmResult -module = "text-completion" +default_ident = "text-completion" -default_input_queue = text_completion_request_queue -default_output_queue = text_completion_response_queue -default_subscriber = module default_model = 'LLaMA_CPP' default_llamafile = os.getenv("LLAMAFILE_URL", "http://localhost:8080/v1") default_temperature = 0.0 default_max_output = 4096 -class Processor(ConsumerProducer): +class Processor(LlmService): def __init__(self, **params): - input_queue = params.get("input_queue", default_input_queue) - output_queue = params.get("output_queue", default_output_queue) - subscriber = params.get("subscriber", default_subscriber) model = params.get("model", default_model) llamafile = params.get("llamafile", default_llamafile) temperature = params.get("temperature", default_temperature) @@ -38,11 +27,6 @@ class Processor(ConsumerProducer): super(Processor, self).__init__( **params | { - "input_queue": input_queue, - "output_queue": output_queue, - "subscriber": subscriber, - "input_schema": TextCompletionRequest, - "output_schema": TextCompletionResponse, "model": model, "temperature": temperature, "max_output": max_output, @@ -50,19 +34,6 @@ class Processor(ConsumerProducer): } ) - if not hasattr(__class__, "text_completion_metric"): - __class__.text_completion_metric = Histogram( - 'text_completion_duration', - 'Text completion duration (seconds)', - buckets=[ - 0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, - 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, - 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, - 30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0, - 120.0 - ] - ) - self.model = model self.llamafile=llamafile self.temperature = temperature @@ -74,38 +45,26 @@ class Processor(ConsumerProducer): print("Initialised", flush=True) - async def handle(self, msg): + async def generate_content(self, system, prompt): - v = msg.value() - - # Sender-produced ID - - id = msg.properties()["id"] - - print(f"Handling prompt {id}...", flush=True) - - prompt = v.system + "\n\n" + v.prompt + prompt = system + "\n\n" + prompt try: - # FIXME: Rate limits - - with __class__.text_completion_metric.time(): - - resp = self.openai.chat.completions.create( - model=self.model, - messages=[ - {"role": "user", "content": prompt} - ] - #temperature=self.temperature, - #max_tokens=self.max_output, - #top_p=1, - #frequency_penalty=0, - #presence_penalty=0, - #response_format={ - # "type": "text" - #} - ) + resp = self.openai.chat.completions.create( + model=self.model, + messages=[ + {"role": "user", "content": prompt} + ] + #temperature=self.temperature, + #max_tokens=self.max_output, + #top_p=1, + #frequency_penalty=0, + #presence_penalty=0, + #response_format={ + # "type": "text" + #} + ) inputtokens = resp.usage.prompt_tokens outputtokens = resp.usage.completion_tokens @@ -114,48 +73,26 @@ class Processor(ConsumerProducer): print(f"Input Tokens: {inputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True) - print("Send response...", flush=True) - r = TextCompletionResponse( - response=resp.choices[0].message.content, - error=None, - in_token=inputtokens, - out_token=outputtokens, - model="llama.cpp" + resp = LlmResult( + text = resp.choices[0].message.content, + in_token = inputtokens, + out_token = outputtokens, + model = "llama.cpp", ) - await self.send(r, properties={"id": id}) - print("Done.", flush=True) + return resp # SLM, presumably there aren't rate limits except Exception as e: print(f"Exception: {e}") - - print("Send error response...", flush=True) - - r = TextCompletionResponse( - error=Error( - type = "llm-error", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - await self.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + raise e @staticmethod def add_args(parser): - ConsumerProducer.add_args( - parser, default_input_queue, default_subscriber, - default_output_queue, - ) + LlmService.add_args(parser) parser.add_argument( '-m', '--model', @@ -184,7 +121,5 @@ class Processor(ConsumerProducer): ) def run(): - - Processor.launch(module, __doc__) - + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/lmstudio/llm.py b/trustgraph-flow/trustgraph/model/text_completion/lmstudio/llm.py index 05ff18a6..c64bd4fa 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/lmstudio/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/lmstudio/llm.py @@ -5,33 +5,23 @@ Input is prompt, output is response. """ from openai import OpenAI -from prometheus_client import Histogram import os -from .... schema import TextCompletionRequest, TextCompletionResponse, Error -from .... schema import text_completion_request_queue -from .... schema import text_completion_response_queue -from .... log_level import LogLevel -from .... base import ConsumerProducer from .... exceptions import TooManyRequests +from .... base import LlmService, LlmResult -module = "text-completion" +default_ident = "text-completion" -default_input_queue = text_completion_request_queue -default_output_queue = text_completion_response_queue default_subscriber = module default_model = 'gemma3:9b' default_url = os.getenv("LMSTUDIO_URL", "http://localhost:1234/") default_temperature = 0.0 default_max_output = 4096 -class Processor(ConsumerProducer): +class Processor(LlmService): def __init__(self, **params): - input_queue = params.get("input_queue", default_input_queue) - output_queue = params.get("output_queue", default_output_queue) - subscriber = params.get("subscriber", default_subscriber) model = params.get("model", default_model) url = params.get("url", default_url) temperature = params.get("temperature", default_temperature) @@ -39,11 +29,6 @@ class Processor(ConsumerProducer): super(Processor, self).__init__( **params | { - "input_queue": input_queue, - "output_queue": output_queue, - "subscriber": subscriber, - "input_schema": TextCompletionRequest, - "output_schema": TextCompletionResponse, "model": model, "temperature": temperature, "max_output": max_output, @@ -51,19 +36,6 @@ class Processor(ConsumerProducer): } ) - if not hasattr(__class__, "text_completion_metric"): - __class__.text_completion_metric = Histogram( - 'text_completion_duration', - 'Text completion duration (seconds)', - buckets=[ - 0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, - 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, - 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, - 30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0, - 120.0 - ] - ) - self.model = model self.url = url + "v1/" self.temperature = temperature @@ -75,42 +47,30 @@ class Processor(ConsumerProducer): print("Initialised", flush=True) - async def handle(self, msg): + async def generate_content(self, system, prompt): - v = msg.value() - - # Sender-produced ID - - id = msg.properties()["id"] - - print(f"Handling prompt {id}...", flush=True) - - prompt = v.system + "\n\n" + v.prompt + prompt = system + "\n\n" + prompt try: - # FIXME: Rate limits + print(prompt) - with __class__.text_completion_metric.time(): + resp = self.openai.chat.completions.create( + model=self.model, + messages=[ + {"role": "user", "content": prompt} + ] + #temperature=self.temperature, + #max_tokens=self.max_output, + #top_p=1, + #frequency_penalty=0, + #presence_penalty=0, + #response_format={ + # "type": "text" + #} + ) - print(prompt) - - resp = self.openai.chat.completions.create( - model=self.model, - messages=[ - {"role": "user", "content": prompt} - ] - #temperature=self.temperature, - #max_tokens=self.max_output, - #top_p=1, - #frequency_penalty=0, - #presence_penalty=0, - #response_format={ - # "type": "text" - #} - ) - - print(resp) + print(resp) inputtokens = resp.usage.prompt_tokens outputtokens = resp.usage.completion_tokens @@ -119,48 +79,26 @@ class Processor(ConsumerProducer): print(f"Input Tokens: {inputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True) - print("Send response...", flush=True) - r = TextCompletionResponse( - response=resp.choices[0].message.content, - error=None, - in_token=inputtokens, - out_token=outputtokens, - model=self.model, + resp = LlmResult( + text = resp.choices[0].message.content, + in_token = inputtokens, + out_token = outputtokens, + model = self.model ) - await self.send(r, properties={"id": id}) - print("Done.", flush=True) + return resp # SLM, presumably there aren't rate limits except Exception as e: print(f"Exception: {e}") - - print("Send error response...", flush=True) - - r = TextCompletionResponse( - error=Error( - type = "llm-error", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - await self.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + raise e @staticmethod def add_args(parser): - ConsumerProducer.add_args( - parser, default_input_queue, default_subscriber, - default_output_queue, - ) + LlmService.add_args(parser) parser.add_argument( '-m', '--model', @@ -189,5 +127,5 @@ class Processor(ConsumerProducer): ) def run(): - Processor.launch(module, __doc__) - + + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/mistral/llm.py b/trustgraph-flow/trustgraph/model/text_completion/mistral/llm.py index 10257cdf..93eccb35 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/mistral/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/mistral/llm.py @@ -5,33 +5,22 @@ Input is prompt, output is response. """ from mistralai import Mistral -from prometheus_client import Histogram import os -from .... schema import TextCompletionRequest, TextCompletionResponse, Error -from .... schema import text_completion_request_queue -from .... schema import text_completion_response_queue -from .... log_level import LogLevel -from .... base import ConsumerProducer from .... exceptions import TooManyRequests +from .... base import LlmService, LlmResult -module = "text-completion" +default_ident = "text-completion" -default_input_queue = text_completion_request_queue -default_output_queue = text_completion_response_queue -default_subscriber = module default_model = 'ministral-8b-latest' default_temperature = 0.0 default_max_output = 4096 default_api_key = os.getenv("MISTRAL_TOKEN") -class Processor(ConsumerProducer): +class Processor(LlmService): def __init__(self, **params): - input_queue = params.get("input_queue", default_input_queue) - output_queue = params.get("output_queue", default_output_queue) - subscriber = params.get("subscriber", default_subscriber) model = params.get("model", default_model) api_key = params.get("api_key", default_api_key) temperature = params.get("temperature", default_temperature) @@ -42,30 +31,12 @@ class Processor(ConsumerProducer): super(Processor, self).__init__( **params | { - "input_queue": input_queue, - "output_queue": output_queue, - "subscriber": subscriber, - "input_schema": TextCompletionRequest, - "output_schema": TextCompletionResponse, "model": model, "temperature": temperature, "max_output": max_output, } ) - if not hasattr(__class__, "text_completion_metric"): - __class__.text_completion_metric = Histogram( - 'text_completion_duration', - 'Text completion duration (seconds)', - buckets=[ - 0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, - 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, - 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, - 30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0, - 120.0 - ] - ) - self.model = model self.temperature = temperature self.max_output = max_output @@ -73,44 +44,34 @@ class Processor(ConsumerProducer): print("Initialised", flush=True) - async def handle(self, msg): + async def generate_content(self, system, prompt): - v = msg.value() - - # Sender-produced ID - - id = msg.properties()["id"] - - print(f"Handling prompt {id}...", flush=True) - - prompt = v.system + "\n\n" + v.prompt + prompt = system + "\n\n" + prompt try: - with __class__.text_completion_metric.time(): - - resp = self.mistral.chat.complete( - model=self.model, - messages=[ - { - "role": "user", - "content": [ - { - "type": "text", - "text": prompt - } - ] - } - ], - temperature=self.temperature, - max_tokens=self.max_output, - top_p=1, - frequency_penalty=0, - presence_penalty=0, - response_format={ - "type": "text" + resp = self.mistral.chat.complete( + model=self.model, + messages=[ + { + "role": "user", + "content": [ + { + "type": "text", + "text": prompt + } + ] } - ) + ], + temperature=self.temperature, + max_tokens=self.max_output, + top_p=1, + frequency_penalty=0, + presence_penalty=0, + response_format={ + "type": "text" + } + ) inputtokens = resp.usage.prompt_tokens outputtokens = resp.usage.completion_tokens @@ -118,17 +79,12 @@ class Processor(ConsumerProducer): print(f"Input Tokens: {inputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True) - print("Send response...", flush=True) - r = TextCompletionResponse( - response=resp.choices[0].message.content, - error=None, - in_token=inputtokens, - out_token=outputtokens, - model=self.model + resp = LlmResult( + text = resp.choices[0].message.content, + in_token = inputtokens, + out_token = outputtokens, + model = self.model ) - await self.send(r, properties={"id": id}) - - print("Done.", flush=True) # FIXME: Wrong exception. The MistralAI library has retry logic # so retry-able errors are retried transparently. It means we @@ -148,31 +104,12 @@ class Processor(ConsumerProducer): # Apart from rate limits, treat all exceptions as unrecoverable print(f"Exception: {e}") - - print("Send error response...", flush=True) - - r = TextCompletionResponse( - error=Error( - type = "llm-error", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - await self.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + raise e @staticmethod def add_args(parser): - ConsumerProducer.add_args( - parser, default_input_queue, default_subscriber, - default_output_queue, - ) + LlmService.add_args(parser) parser.add_argument( '-m', '--model', @@ -201,7 +138,5 @@ class Processor(ConsumerProducer): ) def run(): - - Processor.launch(module, __doc__) - + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/ollama/llm.py b/trustgraph-flow/trustgraph/model/text_completion/ollama/llm.py index 91e627e3..6afe0aea 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/ollama/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/ollama/llm.py @@ -5,87 +5,40 @@ Input is prompt, output is response. """ from ollama import Client -from prometheus_client import Histogram, Info import os -from .... schema import TextCompletionRequest, TextCompletionResponse, Error -from .... schema import text_completion_request_queue -from .... schema import text_completion_response_queue -from .... log_level import LogLevel -from .... base import ConsumerProducer from .... exceptions import TooManyRequests +from .... base import LlmService, LlmResult -module = "text-completion" +default_ident = "text-completion" -default_input_queue = text_completion_request_queue -default_output_queue = text_completion_response_queue -default_subscriber = module default_model = 'gemma2:9b' default_ollama = os.getenv("OLLAMA_HOST", 'http://localhost:11434') -class Processor(ConsumerProducer): +class Processor(LlmService): def __init__(self, **params): - input_queue = params.get("input_queue", default_input_queue) - output_queue = params.get("output_queue", default_output_queue) - subscriber = params.get("subscriber", default_subscriber) model = params.get("model", default_model) ollama = params.get("ollama", default_ollama) super(Processor, self).__init__( **params | { - "input_queue": input_queue, - "output_queue": output_queue, - "subscriber": subscriber, "model": model, "ollama": ollama, - "input_schema": TextCompletionRequest, - "output_schema": TextCompletionResponse, } ) - if not hasattr(__class__, "text_completion_metric"): - __class__.text_completion_metric = Histogram( - 'text_completion_duration', - 'Text completion duration (seconds)', - buckets=[ - 0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, - 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, - 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, - 30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0, - 120.0 - ] - ) - - if not hasattr(__class__, "model_metric"): - __class__.model_metric = Info( - 'model', 'Model information' - ) - - __class__.model_metric.info({ - "model": model, - "ollama": ollama, - }) - self.model = model self.llm = Client(host=ollama) - async def handle(self, msg): + async def generate_content(self, system, prompt): - v = msg.value() - - # Sender-produced ID - id = msg.properties()["id"] - - print(f"Handling prompt {id}...", flush=True) - - prompt = v.system + "\n\n" + v.prompt + prompt = system + "\n\n" + prompt try: - with __class__.text_completion_metric.time(): - response = self.llm.generate(self.model, prompt) + response = self.llm.generate(self.model, prompt) response_text = response['response'] print("Send response...", flush=True) @@ -94,42 +47,26 @@ class Processor(ConsumerProducer): inputtokens = int(response['prompt_eval_count']) outputtokens = int(response['eval_count']) - r = TextCompletionResponse(response=response_text, error=None, in_token=inputtokens, out_token=outputtokens, model="ollama") + resp = LlmResult( + text = response_text, + in_token = inputtokens, + out_token = outputtokens, + model = self.model + ) - await self.send(r, properties={"id": id}) - - print("Done.", flush=True) + return resp # SLM, presumably no rate limits except Exception as e: print(f"Exception: {e}") - - print("Send error response...", flush=True) - - r = TextCompletionResponse( - error=Error( - type = "llm-error", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - await self.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + raise e @staticmethod def add_args(parser): - ConsumerProducer.add_args( - parser, default_input_queue, default_subscriber, - default_output_queue, - ) + LlmService.add_args(parser) parser.add_argument( '-m', '--model', @@ -145,6 +82,4 @@ class Processor(ConsumerProducer): def run(): - Processor.launch(module, __doc__) - - + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py b/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py index 2479034d..c8bfcdda 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py @@ -5,20 +5,13 @@ Input is prompt, output is response. """ from openai import OpenAI, RateLimitError -from prometheus_client import Histogram import os -from .... schema import TextCompletionRequest, TextCompletionResponse, Error -from .... schema import text_completion_request_queue -from .... schema import text_completion_response_queue -from .... log_level import LogLevel -from .... base import ConsumerProducer from .... exceptions import TooManyRequests +from .... base import LlmService, LlmResult -module = "text-completion" +default_ident = "text-completion" -default_input_queue = text_completion_request_queue -default_output_queue = text_completion_response_queue default_subscriber = module default_model = 'gpt-3.5-turbo' default_temperature = 0.0 @@ -26,13 +19,10 @@ default_max_output = 4096 default_api_key = os.getenv("OPENAI_TOKEN") default_base_url = os.getenv("OPENAI_BASE_URL", None) -class Processor(ConsumerProducer): +class Processor(LlmService): def __init__(self, **params): - input_queue = params.get("input_queue", default_input_queue) - output_queue = params.get("output_queue", default_output_queue) - subscriber = params.get("subscriber", default_subscriber) model = params.get("model", default_model) api_key = params.get("api_key", default_api_key) base_url = params.get("base_url", default_base_url) @@ -44,11 +34,6 @@ class Processor(ConsumerProducer): super(Processor, self).__init__( **params | { - "input_queue": input_queue, - "output_queue": output_queue, - "subscriber": subscriber, - "input_schema": TextCompletionRequest, - "output_schema": TextCompletionResponse, "model": model, "temperature": temperature, "max_output": max_output, @@ -56,19 +41,6 @@ class Processor(ConsumerProducer): } ) - if not hasattr(__class__, "text_completion_metric"): - __class__.text_completion_metric = Histogram( - 'text_completion_duration', - 'Text completion duration (seconds)', - buckets=[ - 0.25, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, - 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, - 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, - 30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 80.0, 100.0, - 120.0 - ] - ) - self.model = model self.temperature = temperature self.max_output = max_output @@ -76,44 +48,34 @@ class Processor(ConsumerProducer): print("Initialised", flush=True) - async def handle(self, msg): + async def generate_content(self, system, prompt): - v = msg.value() - - # Sender-produced ID - - id = msg.properties()["id"] - - print(f"Handling prompt {id}...", flush=True) - - prompt = v.system + "\n\n" + v.prompt + prompt = system + "\n\n" + prompt try: - with __class__.text_completion_metric.time(): - - resp = self.openai.chat.completions.create( - model=self.model, - messages=[ - { - "role": "user", - "content": [ - { - "type": "text", - "text": prompt - } - ] - } - ], - temperature=self.temperature, - max_tokens=self.max_output, - top_p=1, - frequency_penalty=0, - presence_penalty=0, - response_format={ - "type": "text" + resp = self.openai.chat.completions.create( + model=self.model, + messages=[ + { + "role": "user", + "content": [ + { + "type": "text", + "text": prompt + } + ] } - ) + ], + temperature=self.temperature, + max_tokens=self.max_output, + top_p=1, + frequency_penalty=0, + presence_penalty=0, + response_format={ + "type": "text" + } + ) inputtokens = resp.usage.prompt_tokens outputtokens = resp.usage.completion_tokens @@ -121,17 +83,14 @@ class Processor(ConsumerProducer): print(f"Input Tokens: {inputtokens}", flush=True) print(f"Output Tokens: {outputtokens}", flush=True) - print("Send response...", flush=True) - r = TextCompletionResponse( - response=resp.choices[0].message.content, - error=None, - in_token=inputtokens, - out_token=outputtokens, - model=self.model + resp = LlmResult( + text = resp.choices[0].message.content, + in_token = inputtokens, + out_token = outputtokens, + model = self.model ) - await self.send(r, properties={"id": id}) - print("Done.", flush=True) + return resp # FIXME: Wrong exception, don't know what this LLM throws # for a rate limit @@ -145,31 +104,12 @@ class Processor(ConsumerProducer): # Apart from rate limits, treat all exceptions as unrecoverable print(f"Exception: {e}") - - print("Send error response...", flush=True) - - r = TextCompletionResponse( - error=Error( - type = "llm-error", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - await self.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + raise e @staticmethod def add_args(parser): - ConsumerProducer.add_args( - parser, default_input_queue, default_subscriber, - default_output_queue, - ) + LlmService.add_args(parser) parser.add_argument( '-m', '--model', @@ -204,7 +144,5 @@ class Processor(ConsumerProducer): ) def run(): - - Processor.launch(module, __doc__) - + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-vertexai/trustgraph/model/text_completion/vertexai/llm.py b/trustgraph-vertexai/trustgraph/model/text_completion/vertexai/llm.py index 3594b76d..854be961 100755 --- a/trustgraph-vertexai/trustgraph/model/text_completion/vertexai/llm.py +++ b/trustgraph-vertexai/trustgraph/model/text_completion/vertexai/llm.py @@ -105,11 +105,12 @@ class Processor(LlmService): safety_settings=self.safety_settings ) - resp = LlmResult() - resp.text = response.text - resp.in_token = response.usage_metadata.prompt_token_count - resp.out_token = response.usage_metadata.candidates_token_count - resp.model = self.model + resp = LlmResult( + text = response.text, + in_token = response.usage_metadata.prompt_token_count, + out_token = response.usage_metadata.candidates_token_count, + model = self.model + ) print(f"Input Tokens: {resp.in_token}", flush=True) print(f"Output Tokens: {resp.out_token}", flush=True)