Rate limit exception handling framework, but no much implementation

This commit is contained in:
Cyber MacGeddon 2024-08-17 18:41:25 +01:00
parent 25f557d8a5
commit 1294826aff
8 changed files with 35 additions and 11 deletions

View file

@ -1,8 +1,10 @@
from pulsar.schema import JsonSchema
from prometheus_client import start_http_server, Histogram, Info, Counter
import time
from . base_processor import BaseProcessor
from .. exceptions import TooManyRequests
class Consumer(BaseProcessor):
@ -59,6 +61,13 @@ class Consumer(BaseProcessor):
__class__.processing_metric.labels(status="success").inc()
except TooManyRequests:
self.consumer.negative_acknowledge(msg)
print("TooManyRequests: will retry")
__class__.processing_metric.labels(status="rate-limit").inc()
time.sleep(15)
continue
except Exception as e:
print("Exception:", e, flush=True)

View file

@ -12,6 +12,7 @@ from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests
module = ".".join(__name__.split(".")[1:-1])
@ -76,6 +77,10 @@ class Processor(ConsumerProducer):
}
resp = requests.post(url, data=body, headers=headers)
if resp.status_code == 429:
raise TooManyRequests()
result = resp.json()
message_content = result['choices'][0]['message']['content']

View file

@ -13,6 +13,7 @@ from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests
module = ".".join(__name__.split(".")[1:-1])
@ -121,6 +122,8 @@ class Processor(ConsumerProducer):
accept = 'application/json'
contentType = 'application/json'
# FIXME: Consider catching request limits and raise TooManyRequests
# See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html
response = self.bedrock.invoke_model(body=promptbody, modelId=self.model, accept=accept, contentType=contentType)
# Mistral Response Structure

View file

@ -57,6 +57,8 @@ class Processor(ConsumerProducer):
print(f"Handling prompt {id}...", flush=True)
prompt = v.prompt
# FIXME: Rate limits?
response = message = self.claude.messages.create(
model=self.model,
max_tokens=1000,

View file

@ -59,15 +59,16 @@ class Processor(ConsumerProducer):
prompt = v.prompt
# FIXME: Deal with rate limits?
output = self.cohere.chat(
model=self.model,
message=prompt,
preamble = "You are a helpful AI-assistant.",
temperature=0.0,
chat_history=[],
prompt_truncation='auto',
connectors=[]
)
model=self.model,
message=prompt,
preamble = "You are a helpful AI-assistant.",
temperature=0.0,
chat_history=[],
prompt_truncation='auto',
connectors=[]
)
resp = output.text
print(resp, flush=True)

View file

@ -65,6 +65,8 @@ class Processor(ConsumerProducer):
print(f"Handling prompt {id}...", flush=True)
prompt = v.prompt
# FIXME: Rate limits?
response = self.llm.invoke(prompt)
print("Send response...", flush=True)

View file

@ -57,6 +57,8 @@ class Processor(ConsumerProducer):
print(f"Handling prompt {id}...", flush=True)
prompt = v.prompt
# FIXME: Rate limits
resp = self.openai.chat.completions.create(
model=self.model,
messages=[

View file

@ -26,6 +26,7 @@ from .... schema import text_completion_request_queue
from .... schema import text_completion_response_queue
from .... log_level import LogLevel
from .... base import ConsumerProducer
from .... exceptions import TooManyRequests
module = ".".join(__name__.split(".")[1:-1])
@ -139,9 +140,8 @@ class Processor(ConsumerProducer):
except google.api_core.exceptions.ResourceExhausted:
print("429, resource busy, sleeping", flush=True)
time.sleep(15)
self.consumer.negative_acknowledge(msg)
# 429 / rate limits case
raise TooManyRequests
# Let other exceptions fall through