mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-26 08:56:21 +02:00
Refactor rate limit handling (#280)
* - Refactored retry for rate limits into the base class - ConsumerProducer is derived from Consumer to simplify code - Added rate_limit_count metrics for rate limit events * Add rate limit events to VertexAI and Google AI Studio * Added Grafana rate limit dashboard * Add rate limit handling to all LLMs
This commit is contained in:
parent
26a586034c
commit
0e03bc05a4
14 changed files with 174 additions and 298 deletions
|
|
@ -7,6 +7,9 @@ import time
|
|||
from . base_processor import BaseProcessor
|
||||
from .. exceptions import TooManyRequests
|
||||
|
||||
default_rate_limit_retry = 10
|
||||
default_rate_limit_timeout = 7200
|
||||
|
||||
class Consumer(BaseProcessor):
|
||||
|
||||
def __init__(self, **params):
|
||||
|
|
@ -22,11 +25,18 @@ class Consumer(BaseProcessor):
|
|||
|
||||
super(Consumer, self).__init__(**params)
|
||||
|
||||
input_queue = params.get("input_queue")
|
||||
subscriber = params.get("subscriber")
|
||||
input_schema = params.get("input_schema")
|
||||
self.input_queue = params.get("input_queue")
|
||||
self.subscriber = params.get("subscriber")
|
||||
self.input_schema = params.get("input_schema")
|
||||
|
||||
if input_schema == None:
|
||||
self.rate_limit_retry = params.get(
|
||||
"rate_limit_retry", default_rate_limit_retry
|
||||
)
|
||||
self.rate_limit_timeout = params.get(
|
||||
"rate_limit_timeout", default_rate_limit_timeout
|
||||
)
|
||||
|
||||
if self.input_schema == None:
|
||||
raise RuntimeError("input_schema must be specified")
|
||||
|
||||
if not hasattr(__class__, "request_metric"):
|
||||
|
|
@ -44,18 +54,27 @@ class Consumer(BaseProcessor):
|
|||
'processing_count', 'Processing count', ["status"]
|
||||
)
|
||||
|
||||
if not hasattr(__class__, "rate_limit_metric"):
|
||||
__class__.rate_limit_metric = Counter(
|
||||
'rate_limit_count', 'Rate limit event count',
|
||||
)
|
||||
|
||||
__class__.pubsub_metric.info({
|
||||
"input_queue": input_queue,
|
||||
"subscriber": subscriber,
|
||||
"input_schema": input_schema.__name__,
|
||||
"input_queue": self.input_queue,
|
||||
"subscriber": self.subscriber,
|
||||
"input_schema": self.input_schema.__name__,
|
||||
"rate_limit_retry": str(self.rate_limit_retry),
|
||||
"rate_limit_timeout": str(self.rate_limit_timeout),
|
||||
})
|
||||
|
||||
self.consumer = self.client.subscribe(
|
||||
input_queue, subscriber,
|
||||
self.input_queue, self.subscriber,
|
||||
consumer_type=pulsar.ConsumerType.Shared,
|
||||
schema=JsonSchema(input_schema),
|
||||
schema=JsonSchema(self.input_schema),
|
||||
)
|
||||
|
||||
print("Initialised consumer.", flush=True)
|
||||
|
||||
def run(self):
|
||||
|
||||
__class__.state_metric.state('running')
|
||||
|
|
@ -64,31 +83,61 @@ class Consumer(BaseProcessor):
|
|||
|
||||
msg = self.consumer.receive()
|
||||
|
||||
try:
|
||||
expiry = time.time() + self.rate_limit_timeout
|
||||
|
||||
with __class__.request_metric.time():
|
||||
self.handle(msg)
|
||||
# This loop is for retry on rate-limit / resource limits
|
||||
while True:
|
||||
|
||||
# Acknowledge successful processing of the message
|
||||
self.consumer.acknowledge(msg)
|
||||
if time.time() > expiry:
|
||||
|
||||
__class__.processing_metric.labels(status="success").inc()
|
||||
print("Gave up waiting for rate-limit retry", flush=True)
|
||||
|
||||
except TooManyRequests:
|
||||
self.consumer.negative_acknowledge(msg)
|
||||
print("TooManyRequests: will retry")
|
||||
__class__.processing_metric.labels(status="rate-limit").inc()
|
||||
time.sleep(5)
|
||||
continue
|
||||
# Message failed to be processed, this causes it to
|
||||
# be retried
|
||||
self.consumer.negative_acknowledge(msg)
|
||||
|
||||
__class__.processing_metric.labels(status="error").inc()
|
||||
|
||||
# Break out of retry loop, processes next message
|
||||
break
|
||||
|
||||
try:
|
||||
|
||||
with __class__.request_metric.time():
|
||||
self.handle(msg)
|
||||
|
||||
# Acknowledge successful processing of the message
|
||||
self.consumer.acknowledge(msg)
|
||||
|
||||
__class__.processing_metric.labels(status="success").inc()
|
||||
|
||||
# Break out of retry loop
|
||||
break
|
||||
|
||||
except TooManyRequests:
|
||||
|
||||
print("TooManyRequests: will retry...", flush=True)
|
||||
|
||||
__class__.rate_limit_metric.inc()
|
||||
|
||||
# Sleep
|
||||
time.sleep(self.rate_limit_retry)
|
||||
|
||||
# Contine from retry loop, just causes a reprocessing
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
except Exception as e:
|
||||
|
||||
print("Exception:", e, flush=True)
|
||||
print("Exception:", e, flush=True)
|
||||
|
||||
# Message failed to be processed
|
||||
self.consumer.negative_acknowledge(msg)
|
||||
# Message failed to be processed, this causes it to
|
||||
# be retried
|
||||
self.consumer.negative_acknowledge(msg)
|
||||
|
||||
__class__.processing_metric.labels(status="error").inc()
|
||||
__class__.processing_metric.labels(status="error").inc()
|
||||
|
||||
# Break out of retry loop, processes next message
|
||||
break
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser, default_input_queue, default_subscriber):
|
||||
|
|
@ -107,3 +156,17 @@ class Consumer(BaseProcessor):
|
|||
help=f'Queue subscriber name (default: {default_subscriber})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--rate-limit-retry',
|
||||
type=int,
|
||||
default=default_rate_limit_retry,
|
||||
help=f'Rate limit retry (default: {default_rate_limit_retry})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--rate-limit-timeout',
|
||||
type=int,
|
||||
default=default_rate_limit_timeout,
|
||||
help=f'Rate limit timeout (default: {default_rate_limit_timeout})'
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -4,111 +4,43 @@ import pulsar
|
|||
from prometheus_client import Histogram, Info, Counter, Enum
|
||||
import time
|
||||
|
||||
from . base_processor import BaseProcessor
|
||||
from . consumer import Consumer
|
||||
from .. exceptions import TooManyRequests
|
||||
|
||||
# FIXME: Derive from consumer? And producer?
|
||||
|
||||
class ConsumerProducer(BaseProcessor):
|
||||
class ConsumerProducer(Consumer):
|
||||
|
||||
def __init__(self, **params):
|
||||
|
||||
if not hasattr(__class__, "state_metric"):
|
||||
__class__.state_metric = Enum(
|
||||
'processor_state', 'Processor state',
|
||||
states=['starting', 'running', 'stopped']
|
||||
)
|
||||
__class__.state_metric.state('starting')
|
||||
super(ConsumerProducer, self).__init__(**params)
|
||||
|
||||
__class__.state_metric.state('starting')
|
||||
|
||||
input_queue = params.get("input_queue")
|
||||
output_queue = params.get("output_queue")
|
||||
subscriber = params.get("subscriber")
|
||||
input_schema = params.get("input_schema")
|
||||
output_schema = params.get("output_schema")
|
||||
|
||||
if not hasattr(__class__, "request_metric"):
|
||||
__class__.request_metric = Histogram(
|
||||
'request_latency', 'Request latency (seconds)'
|
||||
)
|
||||
self.output_queue = params.get("output_queue")
|
||||
self.output_schema = params.get("output_schema")
|
||||
|
||||
if not hasattr(__class__, "output_metric"):
|
||||
__class__.output_metric = Counter(
|
||||
'output_count', 'Output items created'
|
||||
)
|
||||
|
||||
if not hasattr(__class__, "pubsub_metric"):
|
||||
__class__.pubsub_metric = Info(
|
||||
'pubsub', 'Pub/sub configuration'
|
||||
)
|
||||
|
||||
if not hasattr(__class__, "processing_metric"):
|
||||
__class__.processing_metric = Counter(
|
||||
'processing_count', 'Processing count', ["status"]
|
||||
)
|
||||
|
||||
__class__.pubsub_metric.info({
|
||||
"input_queue": input_queue,
|
||||
"output_queue": output_queue,
|
||||
"subscriber": subscriber,
|
||||
"input_schema": input_schema.__name__,
|
||||
"output_schema": output_schema.__name__,
|
||||
"input_queue": self.input_queue,
|
||||
"output_queue": self.output_queue,
|
||||
"subscriber": self.subscriber,
|
||||
"input_schema": self.input_schema.__name__,
|
||||
"output_schema": self.output_schema.__name__,
|
||||
"rate_limit_retry": str(self.rate_limit_retry),
|
||||
"rate_limit_timeout": str(self.rate_limit_timeout),
|
||||
})
|
||||
|
||||
super(ConsumerProducer, self).__init__(**params)
|
||||
|
||||
if input_schema == None:
|
||||
raise RuntimeError("input_schema must be specified")
|
||||
|
||||
if output_schema == None:
|
||||
if self.output_schema == None:
|
||||
raise RuntimeError("output_schema must be specified")
|
||||
|
||||
self.producer = self.client.create_producer(
|
||||
topic=output_queue,
|
||||
schema=JsonSchema(output_schema),
|
||||
topic=self.output_queue,
|
||||
schema=JsonSchema(self.output_schema),
|
||||
chunking_enabled=True,
|
||||
)
|
||||
|
||||
self.consumer = self.client.subscribe(
|
||||
input_queue, subscriber,
|
||||
consumer_type=pulsar.ConsumerType.Shared,
|
||||
schema=JsonSchema(input_schema),
|
||||
)
|
||||
|
||||
def run(self):
|
||||
|
||||
__class__.state_metric.state('running')
|
||||
|
||||
while True:
|
||||
|
||||
msg = self.consumer.receive()
|
||||
|
||||
try:
|
||||
|
||||
with __class__.request_metric.time():
|
||||
resp = self.handle(msg)
|
||||
|
||||
# Acknowledge successful processing of the message
|
||||
self.consumer.acknowledge(msg)
|
||||
|
||||
__class__.processing_metric.labels(status="success").inc()
|
||||
|
||||
except TooManyRequests:
|
||||
self.consumer.negative_acknowledge(msg)
|
||||
print("TooManyRequests: will retry")
|
||||
__class__.processing_metric.labels(status="rate-limit").inc()
|
||||
time.sleep(5)
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
|
||||
print("Exception:", e, flush=True)
|
||||
|
||||
# Message failed to be processed
|
||||
self.consumer.negative_acknowledge(msg)
|
||||
|
||||
__class__.processing_metric.labels(status="error").inc()
|
||||
print("Initialised consumer/producer.")
|
||||
|
||||
def send(self, msg, properties={}):
|
||||
self.producer.send(msg, properties)
|
||||
|
|
@ -120,19 +52,7 @@ class ConsumerProducer(BaseProcessor):
|
|||
default_output_queue,
|
||||
):
|
||||
|
||||
BaseProcessor.add_args(parser)
|
||||
|
||||
parser.add_argument(
|
||||
'-i', '--input-queue',
|
||||
default=default_input_queue,
|
||||
help=f'Input queue (default: {default_input_queue})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-s', '--subscriber',
|
||||
default=default_subscriber,
|
||||
help=f'Queue subscriber name (default: {default_subscriber})'
|
||||
)
|
||||
Consumer.add_args(parser, default_input_queue, default_subscriber)
|
||||
|
||||
parser.add_argument(
|
||||
'-o', '--output-queue',
|
||||
|
|
|
|||
|
|
@ -8,7 +8,3 @@ class LlmError(Exception):
|
|||
class ParseError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue