Rate limit handling (#11)

* Added a rate limit exception
* Reduce request/response timeouts because looks like there are major issues
* Add rate limit exception catch to all consumers
* Version to 0.6.3
This commit is contained in:
cybermaggedon 2024-08-19 22:15:32 +01:00 committed by GitHub
parent 25f557d8a5
commit a38f530c5f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 188 additions and 152 deletions

View file

@ -1,8 +1,10 @@
from pulsar.schema import JsonSchema
from prometheus_client import start_http_server, Histogram, Info, Counter
import time
from . base_processor import BaseProcessor
from .. exceptions import TooManyRequests
class Consumer(BaseProcessor):
@ -59,6 +61,13 @@ class Consumer(BaseProcessor):
__class__.processing_metric.labels(status="success").inc()
except TooManyRequests:
self.consumer.negative_acknowledge(msg)
print("TooManyRequests: will retry")
__class__.processing_metric.labels(status="rate-limit").inc()
time.sleep(5)
continue
except Exception as e:
print("Exception:", e, flush=True)

View file

@ -1,8 +1,10 @@
from pulsar.schema import JsonSchema
from prometheus_client import Histogram, Info, Counter
import time
from . base_processor import BaseProcessor
from .. exceptions import TooManyRequests
# FIXME: Derive from consumer? And producer?
@ -78,6 +80,13 @@ class ConsumerProducer(BaseProcessor):
__class__.processing_metric.labels(status="success").inc()
except TooManyRequests:
self.consumer.negative_acknowledge(msg)
print("TooManyRequests: will retry")
__class__.processing_metric.labels(status="rate-limit").inc()
time.sleep(5)
continue
except Exception as e:
print("Exception:", e, flush=True)

View file

@ -51,7 +51,7 @@ class EmbeddingsClient:
schema=JsonSchema(EmbeddingsResponse),
)
def request(self, text, timeout=500):
def request(self, text, timeout=10):
id = str(uuid.uuid4())

4
trustgraph/exceptions.py Normal file
View file

@ -0,0 +1,4 @@
class TooManyRequests(Exception):
pass

View file

@ -51,7 +51,7 @@ class LlmClient:
schema=JsonSchema(TextCompletionResponse),
)
def request(self, prompt, timeout=500):
def request(self, prompt, timeout=30):
id = str(uuid.uuid4())

View file

@ -12,6 +12,7 @@ from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests
module = ".".join(__name__.split(".")[1:-1])
@ -76,6 +77,10 @@ class Processor(ConsumerProducer):
}
resp = requests.post(url, data=body, headers=headers)
if resp.status_code == 429:
raise TooManyRequests()
result = resp.json()
message_content = result['choices'][0]['message']['content']

View file

@ -13,6 +13,7 @@ from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests
module = ".".join(__name__.split(".")[1:-1])
@ -121,6 +122,8 @@ class Processor(ConsumerProducer):
accept = 'application/json'
contentType = 'application/json'
# FIXME: Consider catching request limits and raise TooManyRequests
# See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html
response = self.bedrock.invoke_model(body=promptbody, modelId=self.model, accept=accept, contentType=contentType)
# Mistral Response Structure

View file

@ -57,6 +57,8 @@ class Processor(ConsumerProducer):
print(f"Handling prompt {id}...", flush=True)
prompt = v.prompt
# FIXME: Rate limits?
response = message = self.claude.messages.create(
model=self.model,
max_tokens=1000,

View file

@ -59,15 +59,16 @@ class Processor(ConsumerProducer):
prompt = v.prompt
# FIXME: Deal with rate limits?
output = self.cohere.chat(
model=self.model,
message=prompt,
preamble = "You are a helpful AI-assistant.",
temperature=0.0,
chat_history=[],
prompt_truncation='auto',
connectors=[]
)
model=self.model,
message=prompt,
preamble = "You are a helpful AI-assistant.",
temperature=0.0,
chat_history=[],
prompt_truncation='auto',
connectors=[]
)
resp = output.text
print(resp, flush=True)

View file

@ -65,6 +65,8 @@ class Processor(ConsumerProducer):
print(f"Handling prompt {id}...", flush=True)
prompt = v.prompt
# FIXME: Rate limits?
response = self.llm.invoke(prompt)
print("Send response...", flush=True)

View file

@ -57,6 +57,8 @@ class Processor(ConsumerProducer):
print(f"Handling prompt {id}...", flush=True)
prompt = v.prompt
# FIXME: Rate limits
resp = self.openai.chat.completions.create(
model=self.model,
messages=[

View file

@ -26,6 +26,7 @@ from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests
module = ".".join(__name__.split(".")[1:-1])
@ -139,9 +140,8 @@ class Processor(ConsumerProducer):
except google.api_core.exceptions.ResourceExhausted:
print("429, resource busy, sleeping", flush=True)
time.sleep(15)
self.consumer.negative_acknowledge(msg)
# 429 / rate limits case
raise TooManyRequests
# Let other exceptions fall through

View file

@ -51,7 +51,7 @@ class PromptClient:
schema=JsonSchema(PromptResponse),
)
def request_definitions(self, chunk, timeout=500):
def request_definitions(self, chunk, timeout=30):
id = str(uuid.uuid4())