diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index 162e10eb..8b7b2b0d 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -1,4 +1,14 @@ +# Consumer is similar to subscriber: It takes information from a queue +# and passes on to a processor function. This is the main receiving +# loop for TrustGraph processors. Incorporates retry functionality + +# Note: there is a 'defect' in the system which is tolerated, althought +# the processing handlers are async functions, ideally implementation +# would use all async code. In practice if the processor only implements +# one handler, and a single thread of concurrency, nothing too outrageous +# will happen if synchronous / blocking code is used + from pulsar.schema import JsonSchema import pulsar import _pulsar @@ -16,6 +26,7 @@ class Consumer: start_of_messages=False, rate_limit_retry_time = 10, rate_limit_timeout = 7200, reconnect_time = 5, + concurrency = 1, # Number of concurrent requests to handle ): self.taskgroup = taskgroup @@ -34,7 +45,9 @@ class Consumer: self.start_of_messages = start_of_messages self.running = True - self.task = None + self.consumer_task = None + + self.concurrency = concurrency self.metrics = metrics @@ -52,7 +65,11 @@ class Consumer: async def stop(self): self.running = False - await self.task + + if self.consumer_task: + await self.consumer_task + + self.consumer_task = None async def start(self): @@ -62,9 +79,9 @@ class Consumer: if self.metrics: self.metrics.state("stopped") - self.task = self.taskgroup.create_task(self.run()) + self.consumer_task = self.taskgroup.create_task(self.consumer_run()) - async def run(self): + async def consumer_run(self): while self.running: @@ -102,7 +119,19 @@ class Consumer: try: - await self.consume() + print( + "Starting", self.concurrency, "receiver threads", + flush=True + ) + + async with asyncio.TaskGroup() as tg: + + tasks = [] + + for i in range(0, self.concurrency): + tasks.append( + tg.create_task(self.consume_from_queue()) + ) if self.metrics: self.metrics.state("stopped") @@ -120,7 +149,7 @@ class Consumer: self.consumer.unsubscribe() self.consumer.close() - async def consume(self): + async def consume_from_queue(self): while self.running: @@ -134,71 +163,75 @@ class Consumer: except Exception as e: raise e - expiry = time.time() + self.rate_limit_timeout + await self.handle_one_from_queue(msg) - # This loop is for retry on rate-limit / resource limits - while self.running: + async def handle_one_from_queue(self, msg): - if time.time() > expiry: + expiry = time.time() + self.rate_limit_timeout - print("Gave up waiting for rate-limit retry", flush=True) + # This loop is for retry on rate-limit / resource limits + while self.running: - # Message failed to be processed, this causes it to - # be retried - self.consumer.negative_acknowledge(msg) + if time.time() > expiry: - if self.metrics: - self.metrics.process("error") + print("Gave up waiting for rate-limit retry", flush=True) - # Break out of retry loop, processes next message - break + # Message failed to be processed, this causes it to + # be retried + self.consumer.negative_acknowledge(msg) - try: + if self.metrics: + self.metrics.process("error") - print("Handle...", flush=True) + # Break out of retry loop, processes next message + break - if self.metrics: + try: - with self.metrics.record_time(): - await self.handler(msg, self, self.flow) + print("Handle...", flush=True) - else: + if self.metrics: + + with self.metrics.record_time(): await self.handler(msg, self, self.flow) - print("Handled.", flush=True) + else: + await self.handler(msg, self, self.flow) - # Acknowledge successful processing of the message - self.consumer.acknowledge(msg) + print("Handled.", flush=True) - if self.metrics: - self.metrics.process("success") + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) - # Break out of retry loop - break + if self.metrics: + self.metrics.process("success") - except TooManyRequests: + # Break out of retry loop + break - print("TooManyRequests: will retry...", flush=True) + except TooManyRequests: - if self.metrics: - self.metrics.rate_limit() + print("TooManyRequests: will retry...", flush=True) - # Sleep - await asyncio.sleep(self.rate_limit_retry_time) + if self.metrics: + self.metrics.rate_limit() - # Contine from retry loop, just causes a reprocessing - continue + # Sleep + await asyncio.sleep(self.rate_limit_retry_time) - except Exception as e: + # Contine from retry loop, just causes a reprocessing + continue - print("consume exception:", e, flush=True) + except Exception as e: - # Message failed to be processed, this causes it to - # be retried - self.consumer.negative_acknowledge(msg) + print("consume exception:", e, flush=True) - if self.metrics: - self.metrics.process("error") + # Message failed to be processed, this causes it to + # be retried + self.consumer.negative_acknowledge(msg) - # Break out of retry loop, processes next message - break + if self.metrics: + self.metrics.process("error") + + # Break out of retry loop, processes next message + break diff --git a/trustgraph-base/trustgraph/base/consumer_spec.py b/trustgraph-base/trustgraph/base/consumer_spec.py index 93665476..89581b02 100644 --- a/trustgraph-base/trustgraph/base/consumer_spec.py +++ b/trustgraph-base/trustgraph/base/consumer_spec.py @@ -4,10 +4,11 @@ from . consumer import Consumer from . spec import Spec class ConsumerSpec(Spec): - def __init__(self, name, schema, handler): + def __init__(self, name, schema, handler, concurrency = 1): self.name = name self.schema = schema self.handler = handler + self.concurrency = concurrency def add(self, flow, processor, definition): @@ -24,6 +25,7 @@ class ConsumerSpec(Spec): schema = self.schema, handler = self.handler, metrics = consumer_metrics, + concurrency = self.concurrency ) # Consumer handle gets access to producers and other diff --git a/trustgraph-base/trustgraph/base/llm_service.py b/trustgraph-base/trustgraph/base/llm_service.py index c79b819b..627bcbb4 100644 --- a/trustgraph-base/trustgraph/base/llm_service.py +++ b/trustgraph-base/trustgraph/base/llm_service.py @@ -11,9 +11,13 @@ from .. exceptions import TooManyRequests from .. base import FlowProcessor, ConsumerSpec, ProducerSpec default_ident = "text-completion" +default_concurrency = 1 class LlmResult: - def __init__(self, text=None, in_token=None, out_token=None, model=None): + def __init__( + self, text = None, in_token = None, out_token = None, + model = None, + ): self.text = text self.in_token = in_token self.out_token = out_token @@ -25,14 +29,19 @@ class LlmService(FlowProcessor): def __init__(self, **params): id = params.get("id") + concurrency = params.get("concurrency", 1) - super(LlmService, self).__init__(**params | { "id": id }) + super(LlmService, self).__init__(**params | { + "id": id, + "concurrency": concurrency, + }) self.register_specification( ConsumerSpec( name = "request", schema = TextCompletionRequest, - handler = self.on_request + handler = self.on_request, + concurrency = concurrency, ) ) @@ -115,5 +124,12 @@ class LlmService(FlowProcessor): @staticmethod def add_args(parser): + parser.add_argument( + '-c', '--concurrency', + type=int, + default=default_concurrency, + help=f'LLM max output tokens (default: {default_concurrency})' + ) + FlowProcessor.add_args(parser) diff --git a/trustgraph-base/trustgraph/base/subscriber.py b/trustgraph-base/trustgraph/base/subscriber.py index 8467b0bf..6e79adab 100644 --- a/trustgraph-base/trustgraph/base/subscriber.py +++ b/trustgraph-base/trustgraph/base/subscriber.py @@ -1,4 +1,8 @@ +# Subscriber is similar to consumer: It provides a service to take stuff +# off of a queue and make it available using an internal broker system, +# so suitable for when multiple recipients are reading from the same queue + from pulsar.schema import JsonSchema import asyncio import _pulsar