diff --git a/grafana/dashboards/dashboard.json b/grafana/dashboards/dashboard.json index 04561863..c484dffa 100644 --- a/grafana/dashboards/dashboard.json +++ b/grafana/dashboards/dashboard.json @@ -577,7 +577,7 @@ "disableTextWrap": false, "editorMode": "builder", "exemplar": false, - "expr": "increase(processing_count_total{status!=\"success\"}[$__rate_interval])", + "expr": "sum by(job) (increase(rate_limit_count_total[$__rate_interval]))", "format": "time_series", "fullMetaSearch": false, "includeNullMetadata": true, @@ -588,7 +588,7 @@ "useBackend": false } ], - "title": "Errors", + "title": "Rate limit events", "type": "timeseries" }, { diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index f346f1bc..521dd3c1 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -7,6 +7,9 @@ import time from . base_processor import BaseProcessor from .. exceptions import TooManyRequests +default_rate_limit_retry = 10 +default_rate_limit_timeout = 7200 + class Consumer(BaseProcessor): def __init__(self, **params): @@ -22,11 +25,18 @@ class Consumer(BaseProcessor): super(Consumer, self).__init__(**params) - input_queue = params.get("input_queue") - subscriber = params.get("subscriber") - input_schema = params.get("input_schema") + self.input_queue = params.get("input_queue") + self.subscriber = params.get("subscriber") + self.input_schema = params.get("input_schema") - if input_schema == None: + self.rate_limit_retry = params.get( + "rate_limit_retry", default_rate_limit_retry + ) + self.rate_limit_timeout = params.get( + "rate_limit_timeout", default_rate_limit_timeout + ) + + if self.input_schema == None: raise RuntimeError("input_schema must be specified") if not hasattr(__class__, "request_metric"): @@ -44,18 +54,27 @@ class Consumer(BaseProcessor): 'processing_count', 'Processing count', ["status"] ) + if not hasattr(__class__, "rate_limit_metric"): + __class__.rate_limit_metric = Counter( + 'rate_limit_count', 'Rate limit event count', + ) + __class__.pubsub_metric.info({ - "input_queue": input_queue, - "subscriber": subscriber, - "input_schema": input_schema.__name__, + "input_queue": self.input_queue, + "subscriber": self.subscriber, + "input_schema": self.input_schema.__name__, + "rate_limit_retry": str(self.rate_limit_retry), + "rate_limit_timeout": str(self.rate_limit_timeout), }) self.consumer = self.client.subscribe( - input_queue, subscriber, + self.input_queue, self.subscriber, consumer_type=pulsar.ConsumerType.Shared, - schema=JsonSchema(input_schema), + schema=JsonSchema(self.input_schema), ) + print("Initialised consumer.", flush=True) + def run(self): __class__.state_metric.state('running') @@ -64,31 +83,61 @@ class Consumer(BaseProcessor): msg = self.consumer.receive() - try: + expiry = time.time() + self.rate_limit_timeout - with __class__.request_metric.time(): - self.handle(msg) + # This loop is for retry on rate-limit / resource limits + while True: - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) + if time.time() > expiry: - __class__.processing_metric.labels(status="success").inc() + print("Gave up waiting for rate-limit retry", flush=True) - except TooManyRequests: - self.consumer.negative_acknowledge(msg) - print("TooManyRequests: will retry") - __class__.processing_metric.labels(status="rate-limit").inc() - time.sleep(5) - continue + # Message failed to be processed, this causes it to + # be retried + self.consumer.negative_acknowledge(msg) + + __class__.processing_metric.labels(status="error").inc() + + # Break out of retry loop, processes next message + break + + try: + + with __class__.request_metric.time(): + self.handle(msg) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + __class__.processing_metric.labels(status="success").inc() + + # Break out of retry loop + break + + except TooManyRequests: + + print("TooManyRequests: will retry...", flush=True) + + __class__.rate_limit_metric.inc() + + # Sleep + time.sleep(self.rate_limit_retry) + + # Contine from retry loop, just causes a reprocessing + continue - except Exception as e: + except Exception as e: - print("Exception:", e, flush=True) + print("Exception:", e, flush=True) - # Message failed to be processed - self.consumer.negative_acknowledge(msg) + # Message failed to be processed, this causes it to + # be retried + self.consumer.negative_acknowledge(msg) - __class__.processing_metric.labels(status="error").inc() + __class__.processing_metric.labels(status="error").inc() + + # Break out of retry loop, processes next message + break @staticmethod def add_args(parser, default_input_queue, default_subscriber): @@ -107,3 +156,17 @@ class Consumer(BaseProcessor): help=f'Queue subscriber name (default: {default_subscriber})' ) + parser.add_argument( + '--rate-limit-retry', + type=int, + default=default_rate_limit_retry, + help=f'Rate limit retry (default: {default_rate_limit_retry})' + ) + + parser.add_argument( + '--rate-limit-timeout', + type=int, + default=default_rate_limit_timeout, + help=f'Rate limit timeout (default: {default_rate_limit_timeout})' + ) + diff --git a/trustgraph-base/trustgraph/base/consumer_producer.py b/trustgraph-base/trustgraph/base/consumer_producer.py index 6d386894..be9915ce 100644 --- a/trustgraph-base/trustgraph/base/consumer_producer.py +++ b/trustgraph-base/trustgraph/base/consumer_producer.py @@ -4,111 +4,43 @@ import pulsar from prometheus_client import Histogram, Info, Counter, Enum import time -from . base_processor import BaseProcessor +from . consumer import Consumer from .. exceptions import TooManyRequests -# FIXME: Derive from consumer? And producer? - -class ConsumerProducer(BaseProcessor): +class ConsumerProducer(Consumer): def __init__(self, **params): - if not hasattr(__class__, "state_metric"): - __class__.state_metric = Enum( - 'processor_state', 'Processor state', - states=['starting', 'running', 'stopped'] - ) - __class__.state_metric.state('starting') + super(ConsumerProducer, self).__init__(**params) - __class__.state_metric.state('starting') - - input_queue = params.get("input_queue") - output_queue = params.get("output_queue") - subscriber = params.get("subscriber") - input_schema = params.get("input_schema") - output_schema = params.get("output_schema") - - if not hasattr(__class__, "request_metric"): - __class__.request_metric = Histogram( - 'request_latency', 'Request latency (seconds)' - ) + self.output_queue = params.get("output_queue") + self.output_schema = params.get("output_schema") if not hasattr(__class__, "output_metric"): __class__.output_metric = Counter( 'output_count', 'Output items created' ) - if not hasattr(__class__, "pubsub_metric"): - __class__.pubsub_metric = Info( - 'pubsub', 'Pub/sub configuration' - ) - - if not hasattr(__class__, "processing_metric"): - __class__.processing_metric = Counter( - 'processing_count', 'Processing count', ["status"] - ) - __class__.pubsub_metric.info({ - "input_queue": input_queue, - "output_queue": output_queue, - "subscriber": subscriber, - "input_schema": input_schema.__name__, - "output_schema": output_schema.__name__, + "input_queue": self.input_queue, + "output_queue": self.output_queue, + "subscriber": self.subscriber, + "input_schema": self.input_schema.__name__, + "output_schema": self.output_schema.__name__, + "rate_limit_retry": str(self.rate_limit_retry), + "rate_limit_timeout": str(self.rate_limit_timeout), }) - super(ConsumerProducer, self).__init__(**params) - - if input_schema == None: - raise RuntimeError("input_schema must be specified") - - if output_schema == None: + if self.output_schema == None: raise RuntimeError("output_schema must be specified") self.producer = self.client.create_producer( - topic=output_queue, - schema=JsonSchema(output_schema), + topic=self.output_queue, + schema=JsonSchema(self.output_schema), chunking_enabled=True, ) - self.consumer = self.client.subscribe( - input_queue, subscriber, - consumer_type=pulsar.ConsumerType.Shared, - schema=JsonSchema(input_schema), - ) - - def run(self): - - __class__.state_metric.state('running') - - while True: - - msg = self.consumer.receive() - - try: - - with __class__.request_metric.time(): - resp = self.handle(msg) - - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) - - __class__.processing_metric.labels(status="success").inc() - - except TooManyRequests: - self.consumer.negative_acknowledge(msg) - print("TooManyRequests: will retry") - __class__.processing_metric.labels(status="rate-limit").inc() - time.sleep(5) - continue - - except Exception as e: - - print("Exception:", e, flush=True) - - # Message failed to be processed - self.consumer.negative_acknowledge(msg) - - __class__.processing_metric.labels(status="error").inc() + print("Initialised consumer/producer.") def send(self, msg, properties={}): self.producer.send(msg, properties) @@ -120,19 +52,7 @@ class ConsumerProducer(BaseProcessor): default_output_queue, ): - BaseProcessor.add_args(parser) - - parser.add_argument( - '-i', '--input-queue', - default=default_input_queue, - help=f'Input queue (default: {default_input_queue})' - ) - - parser.add_argument( - '-s', '--subscriber', - default=default_subscriber, - help=f'Queue subscriber name (default: {default_subscriber})' - ) + Consumer.add_args(parser, default_input_queue, default_subscriber) parser.add_argument( '-o', '--output-queue', diff --git a/trustgraph-base/trustgraph/exceptions.py b/trustgraph-base/trustgraph/exceptions.py index 16f9956c..afe72ccc 100644 --- a/trustgraph-base/trustgraph/exceptions.py +++ b/trustgraph-base/trustgraph/exceptions.py @@ -8,7 +8,3 @@ class LlmError(Exception): class ParseError(Exception): pass - - - - diff --git a/trustgraph-bedrock/trustgraph/model/text_completion/bedrock/llm.py b/trustgraph-bedrock/trustgraph/model/text_completion/bedrock/llm.py index a9c05cc8..3b2aced5 100755 --- a/trustgraph-bedrock/trustgraph/model/text_completion/bedrock/llm.py +++ b/trustgraph-bedrock/trustgraph/model/text_completion/bedrock/llm.py @@ -267,6 +267,7 @@ class Processor(ConsumerProducer): except Exception as e: + print(type(e)) print(f"Exception: {e}") print("Send error response...", flush=True) diff --git a/trustgraph-flow/trustgraph/model/text_completion/azure/llm.py b/trustgraph-flow/trustgraph/model/text_completion/azure/llm.py index 4db7dbf1..90be6962 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/azure/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/azure/llm.py @@ -158,25 +158,15 @@ class Processor(ConsumerProducer): except TooManyRequests: - print("Send rate limit response...", flush=True) + print("Rate limit...") - r = TextCompletionResponse( - error=Error( - type = "rate-limit", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - self.producer.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + # Leave rate limit retries to the base handler + raise TooManyRequests() except Exception as e: + # Apart from rate limits, treat all exceptions as unrecoverable + print(f"Exception: {e}") print("Send error response...", flush=True) diff --git a/trustgraph-flow/trustgraph/model/text_completion/azure_openai/llm.py b/trustgraph-flow/trustgraph/model/text_completion/azure_openai/llm.py index a3edb859..f5ecb8d6 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/azure_openai/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/azure_openai/llm.py @@ -4,10 +4,9 @@ Simple LLM service, performs text prompt completion using the Azure OpenAI endpoit service. Input is prompt, output is response. """ -import requests import json from prometheus_client import Histogram -from openai import AzureOpenAI +from openai import AzureOpenAI, RateLimitError import os from .... schema import TextCompletionRequest, TextCompletionResponse, Error @@ -126,30 +125,27 @@ class Processor(ConsumerProducer): print(f"Output Tokens: {outputtokens}", flush=True) print("Send response...", flush=True) - r = TextCompletionResponse(response=resp.choices[0].message.content, error=None, in_token=inputtokens, out_token=outputtokens, model=self.model) - self.producer.send(r, properties={"id": id}) - - except TooManyRequests: - - print("Send rate limit response...", flush=True) - r = TextCompletionResponse( - error=Error( - type = "rate-limit", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, + response=resp.choices[0].message.content, + error=None, + in_token=inputtokens, + out_token=outputtokens, + model=self.model ) self.producer.send(r, properties={"id": id}) - self.consumer.acknowledge(msg) + except RateLimitError: + + print("Send rate limit response...", flush=True) + + # Leave rate limit retries to the base handler + raise TooManyRequests() except Exception as e: + # Apart from rate limits, treat all exceptions as unrecoverable + print(f"Exception: {e}") print("Send error response...", flush=True) diff --git a/trustgraph-flow/trustgraph/model/text_completion/claude/llm.py b/trustgraph-flow/trustgraph/model/text_completion/claude/llm.py index 01ce837d..5cfd8907 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/claude/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/claude/llm.py @@ -87,8 +87,6 @@ class Processor(ConsumerProducer): try: - # FIXME: Rate limits? - with __class__.text_completion_metric.time(): response = message = self.claude.messages.create( @@ -117,34 +115,26 @@ class Processor(ConsumerProducer): print(f"Output Tokens: {outputtokens}", flush=True) print("Send response...", flush=True) - r = TextCompletionResponse(response=resp, error=None, in_token=inputtokens, out_token=outputtokens, model=self.model) + r = TextCompletionResponse( + response=resp, + error=None, + in_token=inputtokens, + out_token=outputtokens, + model=self.model + ) self.send(r, properties={"id": id}) print("Done.", flush=True) - # FIXME: Wrong exception, don't know what this LLM throws - # for a rate limit - except TooManyRequests: + except anthropic.RateLimitError: - print("Send rate limit response...", flush=True) - - r = TextCompletionResponse( - error=Error( - type = "rate-limit", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - self.producer.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + # Leave rate limit retries to the base handler + raise TooManyRequests() except Exception as e: + # Apart from rate limits, treat all exceptions as unrecoverable + print(f"Exception: {e}") print("Send error response...", flush=True) diff --git a/trustgraph-flow/trustgraph/model/text_completion/cohere/llm.py b/trustgraph-flow/trustgraph/model/text_completion/cohere/llm.py index d03e1554..5b8e3ba9 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/cohere/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/cohere/llm.py @@ -112,27 +112,15 @@ class Processor(ConsumerProducer): # FIXME: Wrong exception, don't know what this LLM throws # for a rate limit - except TooManyRequests: + except cohere.TooManyRequestsError: - print("Send rate limit response...", flush=True) - - r = TextCompletionResponse( - error=Error( - type = "rate-limit", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - self.producer.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + # Leave rate limit retries to the base handler + raise TooManyRequests() except Exception as e: + # Apart from rate limits, treat all exceptions as unrecoverable + print(f"Exception: {e}") print("Send error response...", flush=True) diff --git a/trustgraph-flow/trustgraph/model/text_completion/googleaistudio/llm.py b/trustgraph-flow/trustgraph/model/text_completion/googleaistudio/llm.py index a249998d..5d5b23a0 100644 --- a/trustgraph-flow/trustgraph/model/text_completion/googleaistudio/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/googleaistudio/llm.py @@ -88,7 +88,8 @@ class Processor(ConsumerProducer): HarmCategory.HARM_CATEGORY_HARASSMENT: block_level, HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: block_level, HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: block_level, - # There is a documentation conflict on whether or not CIVIC_INTEGRITY is a valid category + # There is a documentation conflict on whether or not + # CIVIC_INTEGRITY is a valid category # HarmCategory.HARM_CATEGORY_CIVIC_INTEGRITY: block_level, } @@ -122,8 +123,6 @@ class Processor(ConsumerProducer): try: - # FIXME: Rate limits? - with __class__.text_completion_metric.time(): chat_session = self.llm.start_chat( @@ -140,35 +139,30 @@ class Processor(ConsumerProducer): print(f"Output Tokens: {outputtokens}", flush=True) print("Send response...", flush=True) - r = TextCompletionResponse(response=resp, error=None, in_token=inputtokens, out_token=outputtokens, model=self.model) + r = TextCompletionResponse( + response=resp, + error=None, + in_token=inputtokens, + out_token=outputtokens, + model=self.model + ) self.send(r, properties={"id": id}) print("Done.", flush=True) - # FIXME: Wrong exception, don't know what this LLM throws - # for a rate limit except ResourceExhausted as e: - print("Send rate limit response...", flush=True) + print("Hit rate limit:", e, flush=True) - r = TextCompletionResponse( - error=Error( - type = "rate-limit", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - self.producer.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + # Leave rate limit retries to the default handler + raise TooManyRequests() except Exception as e: - print(f"Exception: {e}") + # Apart from rate limits, treat all exceptions as unrecoverable + + print(type(e), flush=True) + print(f"Exception: {e}", flush=True) print("Send error response...", flush=True) diff --git a/trustgraph-flow/trustgraph/model/text_completion/llamafile/llm.py b/trustgraph-flow/trustgraph/model/text_completion/llamafile/llm.py index 274948a8..65a2b171 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/llamafile/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/llamafile/llm.py @@ -126,26 +126,7 @@ class Processor(ConsumerProducer): print("Done.", flush=True) - # FIXME: Wrong exception, don't know what this LLM throws - # for a rate limit - except TooManyRequests: - - print("Send rate limit response...", flush=True) - - r = TextCompletionResponse( - error=Error( - type = "rate-limit", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - self.producer.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + # SLM, presumably there aren't rate limits except Exception as e: diff --git a/trustgraph-flow/trustgraph/model/text_completion/ollama/llm.py b/trustgraph-flow/trustgraph/model/text_completion/ollama/llm.py index 00d44f6d..8c5bd3dc 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/ollama/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/ollama/llm.py @@ -100,26 +100,7 @@ class Processor(ConsumerProducer): print("Done.", flush=True) - # FIXME: Wrong exception, don't know what this LLM throws - # for a rate limit - except TooManyRequests: - - print("Send rate limit response...", flush=True) - - r = TextCompletionResponse( - error=Error( - type = "rate-limit", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - self.producer.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + # SLM, presumably no rate limits except Exception as e: diff --git a/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py b/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py index c874943e..c2b948d5 100755 --- a/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py +++ b/trustgraph-flow/trustgraph/model/text_completion/openai/llm.py @@ -4,7 +4,7 @@ Simple LLM service, performs text prompt completion using OpenAI. Input is prompt, output is response. """ -from openai import OpenAI +from openai import OpenAI, RateLimitError from prometheus_client import Histogram import os @@ -87,8 +87,6 @@ class Processor(ConsumerProducer): try: - # FIXME: Rate limits - with __class__.text_completion_metric.time(): resp = self.openai.chat.completions.create( @@ -134,27 +132,15 @@ class Processor(ConsumerProducer): # FIXME: Wrong exception, don't know what this LLM throws # for a rate limit - except TooManyRequests: + except openai.RateLimitError: - print("Send rate limit response...", flush=True) - - r = TextCompletionResponse( - error=Error( - type = "rate-limit", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - self.producer.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + # Leave rate limit retries to the base handler + raise TooManyRequests() except Exception as e: + # Apart from rate limits, treat all exceptions as unrecoverable + print(f"Exception: {e}") print("Send error response...", flush=True) diff --git a/trustgraph-vertexai/trustgraph/model/text_completion/vertexai/llm.py b/trustgraph-vertexai/trustgraph/model/text_completion/vertexai/llm.py index cb817836..d6a2efec 100755 --- a/trustgraph-vertexai/trustgraph/model/text_completion/vertexai/llm.py +++ b/trustgraph-vertexai/trustgraph/model/text_completion/vertexai/llm.py @@ -178,25 +178,15 @@ class Processor(ConsumerProducer): except google.api_core.exceptions.ResourceExhausted as e: - print("Send rate limit response...", flush=True) + print("Hit rate limit:", e, flush=True) - r = TextCompletionResponse( - error=Error( - type = "rate-limit", - message = str(e), - ), - response=None, - in_token=None, - out_token=None, - model=None, - ) - - self.producer.send(r, properties={"id": id}) - - self.consumer.acknowledge(msg) + # Leave rate limit retries to the base handler + raise TooManyRequests() except Exception as e: + # Apart from rate limits, treat all exceptions as unrecoverable + print(f"Exception: {e}") print("Send error response...", flush=True)