diff --git a/Containerfile b/Containerfile index 1bb0a7fd..7f80a514 100644 --- a/Containerfile +++ b/Containerfile @@ -13,7 +13,7 @@ RUN dnf install -y python3 python3-pip python3-wheel python3-aiohttp \ RUN pip3 install torch --index-url https://download.pytorch.org/whl/cpu -RUN pip3 install anthropic boto3 cohere openai google-cloud-aiplatform \ +RUN pip3 install anthropic boto3 cohere openai google-cloud-aiplatform ollama \ langchain langchain-core langchain-huggingface langchain-text-splitters \ langchain-community pymilvus sentence-transformers transformers \ huggingface-hub pulsar-client cassandra-driver pyarrow pyyaml \ diff --git a/prometheus/prometheus.yml b/prometheus/prometheus.yml index f459dad9..60a8e488 100644 --- a/prometheus/prometheus.yml +++ b/prometheus/prometheus.yml @@ -29,6 +29,7 @@ scrape_configs: - 'kg-extract-definitions:8000' - 'kg-extract-topics:8000' - 'kg-extract-relationships:8000' + - 'metering:8000' - 'store-graph-embeddings:8000' - 'store-triples:8000' - 'text-completion:8000' diff --git a/requirements.txt b/requirements.txt index 9a49a5aa..0d269066 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,3 +20,4 @@ pyyaml prometheus-client pyarrow boto3 +ollama diff --git a/scripts/metering b/scripts/metering new file mode 100644 index 00000000..7f1d0e12 --- /dev/null +++ b/scripts/metering @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 + +from trustgraph.metering import run + +run() \ No newline at end of file diff --git a/setup.py b/setup.py index 8240a805..e50b4e51 100644 --- a/setup.py +++ b/setup.py @@ -81,6 +81,7 @@ setuptools.setup( "scripts/load-pdf", "scripts/load-text", "scripts/load-triples", + "scripts/metering", "scripts/object-extract-row", "scripts/oe-write-milvus", "scripts/pdf-decoder", diff --git a/trustgraph/metering/__init__.py b/trustgraph/metering/__init__.py new file mode 100644 index 00000000..0ed03774 --- /dev/null +++ b/trustgraph/metering/__init__.py @@ -0,0 +1,3 @@ + +from . counter import * + diff --git a/trustgraph/metering/__main__.py b/trustgraph/metering/__main__.py new file mode 100755 index 00000000..802f2b8d --- /dev/null +++ b/trustgraph/metering/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . counter import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/metering/counter.py b/trustgraph/metering/counter.py new file mode 100644 index 00000000..0a33f413 --- /dev/null +++ b/trustgraph/metering/counter.py @@ -0,0 +1,71 @@ +""" +Simple token counter for each LLM response. +""" + +from prometheus_client import Histogram, Info +from . pricelist import price_list + +from .. schema import TextCompletionResponse, Error +from .. schema import text_completion_response_queue +from .. log_level import LogLevel +from .. base import Consumer + +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = text_completion_response_queue +default_subscriber = module + + +class Processor(Consumer): + + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + subscriber = params.get("subscriber", default_subscriber) + + super(Processor, self).__init__( + **params | { + "input_queue": input_queue, + "subscriber": subscriber, + "input_schema": TextCompletionResponse, + } + ) + + def get_prices(self, prices, modelname): + for model in prices["price_list"]: + if model["model_name"] == modelname: + return model["input_price"], model["output_price"] + return None, None # Return None if model is not found + + def handle(self, msg): + + v = msg.value() + modelname = v.model + + # Sender-produced ID + id = msg.properties()["id"] + + print(f"Handling response {id}...", flush=True) + + num_in = v.in_token + num_out = v.out_token + + model_input_price, model_output_price = self.get_prices(price_list, modelname) + cost_in = num_in * model_input_price + cost_out = num_out * model_output_price + cost_per_call = cost_in + cost_out + + print(f"Input Tokens: {num_in}", flush=True) + print(f"Output Tokens: {num_out}", flush=True) + print(f"Cost for call: ${cost_per_call:.6f}", flush=True) + + @staticmethod + def add_args(parser): + + Consumer.add_args( + parser, default_input_queue, default_subscriber, + ) + +def run(): + + Processor.start(module, __doc__) \ No newline at end of file diff --git a/trustgraph/metering/pricelist.py b/trustgraph/metering/pricelist.py new file mode 100644 index 00000000..bffa9bec --- /dev/null +++ b/trustgraph/metering/pricelist.py @@ -0,0 +1,49 @@ +price_list = { + "price_list": [ + { + "model_name": "mistral.mistral-large-2407-v1:0", + "input_price": 0.000004, + "output_price": 0.000012 + }, + { + "model_name": "meta.llama3-1-405b-instruct-v1:0", + "input_price": 0.00000532, + "output_price": 0.000016 + }, + { + "model_name": "mistral.mixtral-8x7b-instruct-v0:1", + "input_price": 0.00000045, + "output_price": 0.0000007 + }, + { + "model_name": "meta.llama3-1-70b-instruct-v1:0", + "input_price": 0.00000099, + "output_price": 0.00000099 + }, + { + "model_name": "meta.llama3-1-8b-instruct-v1:0", + "input_price": 0.00000022, + "output_price": 0.00000022 + }, + { + "model_name": "anthropic.claude-3-haiku-20240307-v1:0", + "input_price": 0.00000025, + "output_price": 0.00000125 + }, + { + "model_name": "anthropic.claude-3-5-sonnet-20240620-v1:0", + "input_price": 0.000003, + "output_price": 0.000015 + }, + { + "model_name": "cohere.command-r-plus-v1:0", + "input_price": 0.0000030, + "output_price": 0.0000150 + }, + { + "model_name": "ollama", + "input_price": 0, + "output_price": 0 + } + ] +} \ No newline at end of file diff --git a/trustgraph/model/text_completion/bedrock/llm.py b/trustgraph/model/text_completion/bedrock/llm.py index fccf6353..0d050261 100755 --- a/trustgraph/model/text_completion/bedrock/llm.py +++ b/trustgraph/model/text_completion/bedrock/llm.py @@ -209,14 +209,23 @@ class Processor(ConsumerProducer): # Use Mistral as default else: response_body = json.loads(response.get("body").read()) - outputtext = response_body['outputs'][0]['text'] + outputtext = response_body['outputs'][0]['text'] + + metadata = response['ResponseMetadata']['HTTPHeaders'] + inputtokens = int(metadata['x-amzn-bedrock-input-token-count']) + outputtokens = int(metadata['x-amzn-bedrock-output-token-count']) print(outputtext, flush=True) + print(f"Input Tokens: {inputtokens}", flush=True) + print(f"Output Tokens: {outputtokens}", flush=True) print("Send response...", flush=True) r = TextCompletionResponse( error=None, - response=outputtext + response=outputtext, + in_token=inputtokens, + out_token=outputtokens, + model=str(self.model), ) self.send(r, properties={"id": id}) @@ -236,6 +245,9 @@ class Processor(ConsumerProducer): message = str(e), ), response=None, + in_token=None, + out_token=None, + model=None, ) self.producer.send(r, properties={"id": id}) @@ -254,6 +266,9 @@ class Processor(ConsumerProducer): message = str(e), ), response=None, + in_token=None, + out_token=None, + model=None, ) self.consumer.acknowledge(msg) diff --git a/trustgraph/model/text_completion/ollama/llm.py b/trustgraph/model/text_completion/ollama/llm.py index 93d89720..b506b3cd 100755 --- a/trustgraph/model/text_completion/ollama/llm.py +++ b/trustgraph/model/text_completion/ollama/llm.py @@ -4,7 +4,7 @@ Simple LLM service, performs text prompt completion using an Ollama service. Input is prompt, output is response. """ -from langchain_community.llms import Ollama +from ollama import Client from prometheus_client import Histogram, Info from .... schema import TextCompletionRequest, TextCompletionResponse, Error @@ -67,7 +67,8 @@ class Processor(ConsumerProducer): "ollama": ollama, }) - self.llm = Ollama(base_url=ollama, model=model) + self.model = model + self.llm = Client(host=ollama) def handle(self, msg): @@ -83,11 +84,16 @@ class Processor(ConsumerProducer): try: with __class__.text_completion_metric.time(): - response = self.llm.invoke(prompt) + response = self.llm.generate(self.model, prompt) + response_text = response['response'] print("Send response...", flush=True) + print(response_text, flush=True) - r = TextCompletionResponse(response=response, error=None) + inputtokens = int(response['prompt_eval_count']) + outputtokens = int(response['eval_count']) + + r = TextCompletionResponse(response=response_text, error=None, in_token=inputtokens, out_token=outputtokens, model="ollama") self.send(r, properties={"id": id}) @@ -105,6 +111,9 @@ class Processor(ConsumerProducer): message = str(e), ), response=None, + in_token=None, + out_token=None, + model=None, ) self.producer.send(r, properties={"id": id}) @@ -123,6 +132,9 @@ class Processor(ConsumerProducer): message = str(e), ), response=None, + in_token=None, + out_token=None, + model=None, ) self.producer.send(r, properties={"id": id}) diff --git a/trustgraph/schema/models.py b/trustgraph/schema/models.py index cf73a203..2196a3d2 100644 --- a/trustgraph/schema/models.py +++ b/trustgraph/schema/models.py @@ -1,5 +1,5 @@ -from pulsar.schema import Record, String, Array, Double +from pulsar.schema import Record, String, Array, Double, Integer from . topic import topic from . types import Error @@ -14,6 +14,9 @@ class TextCompletionRequest(Record): class TextCompletionResponse(Record): error = Error() response = String() + in_token = Integer() + out_token = Integer() + model = String() text_completion_request_queue = topic( 'text-completion', kind='non-persistent', namespace='request'