Add multi-threading to consumer base-class and all LLMs (#408)

* Concurrency in consumers

* Add concurrency to consumer spec

* Add concurrency command-line option to all LLMs (default 1)
This commit is contained in:
cybermaggedon 2025-06-04 10:49:56 +01:00 committed by GitHub
parent 083702d3d4
commit e10e9d2295
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 107 additions and 52 deletions

View file

@ -1,4 +1,14 @@
# Consumer is similar to subscriber: It takes information from a queue
# and passes on to a processor function. This is the main receiving
# loop for TrustGraph processors. Incorporates retry functionality
# Note: there is a 'defect' in the system which is tolerated, althought
# the processing handlers are async functions, ideally implementation
# would use all async code. In practice if the processor only implements
# one handler, and a single thread of concurrency, nothing too outrageous
# will happen if synchronous / blocking code is used
from pulsar.schema import JsonSchema from pulsar.schema import JsonSchema
import pulsar import pulsar
import _pulsar import _pulsar
@ -16,6 +26,7 @@ class Consumer:
start_of_messages=False, start_of_messages=False,
rate_limit_retry_time = 10, rate_limit_timeout = 7200, rate_limit_retry_time = 10, rate_limit_timeout = 7200,
reconnect_time = 5, reconnect_time = 5,
concurrency = 1, # Number of concurrent requests to handle
): ):
self.taskgroup = taskgroup self.taskgroup = taskgroup
@ -34,7 +45,9 @@ class Consumer:
self.start_of_messages = start_of_messages self.start_of_messages = start_of_messages
self.running = True self.running = True
self.task = None self.consumer_task = None
self.concurrency = concurrency
self.metrics = metrics self.metrics = metrics
@ -52,7 +65,11 @@ class Consumer:
async def stop(self): async def stop(self):
self.running = False self.running = False
await self.task
if self.consumer_task:
await self.consumer_task
self.consumer_task = None
async def start(self): async def start(self):
@ -62,9 +79,9 @@ class Consumer:
if self.metrics: if self.metrics:
self.metrics.state("stopped") self.metrics.state("stopped")
self.task = self.taskgroup.create_task(self.run()) self.consumer_task = self.taskgroup.create_task(self.consumer_run())
async def run(self): async def consumer_run(self):
while self.running: while self.running:
@ -102,7 +119,19 @@ class Consumer:
try: try:
await self.consume() print(
"Starting", self.concurrency, "receiver threads",
flush=True
)
async with asyncio.TaskGroup() as tg:
tasks = []
for i in range(0, self.concurrency):
tasks.append(
tg.create_task(self.consume_from_queue())
)
if self.metrics: if self.metrics:
self.metrics.state("stopped") self.metrics.state("stopped")
@ -120,7 +149,7 @@ class Consumer:
self.consumer.unsubscribe() self.consumer.unsubscribe()
self.consumer.close() self.consumer.close()
async def consume(self): async def consume_from_queue(self):
while self.running: while self.running:
@ -134,71 +163,75 @@ class Consumer:
except Exception as e: except Exception as e:
raise e raise e
expiry = time.time() + self.rate_limit_timeout await self.handle_one_from_queue(msg)
# This loop is for retry on rate-limit / resource limits async def handle_one_from_queue(self, msg):
while self.running:
if time.time() > expiry: expiry = time.time() + self.rate_limit_timeout
print("Gave up waiting for rate-limit retry", flush=True) # This loop is for retry on rate-limit / resource limits
while self.running:
# Message failed to be processed, this causes it to if time.time() > expiry:
# be retried
self.consumer.negative_acknowledge(msg)
if self.metrics: print("Gave up waiting for rate-limit retry", flush=True)
self.metrics.process("error")
# Break out of retry loop, processes next message # Message failed to be processed, this causes it to
break # be retried
self.consumer.negative_acknowledge(msg)
try: if self.metrics:
self.metrics.process("error")
print("Handle...", flush=True) # Break out of retry loop, processes next message
break
if self.metrics: try:
with self.metrics.record_time(): print("Handle...", flush=True)
await self.handler(msg, self, self.flow)
else: if self.metrics:
with self.metrics.record_time():
await self.handler(msg, self, self.flow) await self.handler(msg, self, self.flow)
print("Handled.", flush=True) else:
await self.handler(msg, self, self.flow)
# Acknowledge successful processing of the message print("Handled.", flush=True)
self.consumer.acknowledge(msg)
if self.metrics: # Acknowledge successful processing of the message
self.metrics.process("success") self.consumer.acknowledge(msg)
# Break out of retry loop if self.metrics:
break self.metrics.process("success")
except TooManyRequests: # Break out of retry loop
break
print("TooManyRequests: will retry...", flush=True) except TooManyRequests:
if self.metrics: print("TooManyRequests: will retry...", flush=True)
self.metrics.rate_limit()
# Sleep if self.metrics:
await asyncio.sleep(self.rate_limit_retry_time) self.metrics.rate_limit()
# Contine from retry loop, just causes a reprocessing # Sleep
continue await asyncio.sleep(self.rate_limit_retry_time)
except Exception as e: # Contine from retry loop, just causes a reprocessing
continue
print("consume exception:", e, flush=True) except Exception as e:
# Message failed to be processed, this causes it to print("consume exception:", e, flush=True)
# be retried
self.consumer.negative_acknowledge(msg)
if self.metrics: # Message failed to be processed, this causes it to
self.metrics.process("error") # be retried
self.consumer.negative_acknowledge(msg)
# Break out of retry loop, processes next message if self.metrics:
break self.metrics.process("error")
# Break out of retry loop, processes next message
break

View file

@ -4,10 +4,11 @@ from . consumer import Consumer
from . spec import Spec from . spec import Spec
class ConsumerSpec(Spec): class ConsumerSpec(Spec):
def __init__(self, name, schema, handler): def __init__(self, name, schema, handler, concurrency = 1):
self.name = name self.name = name
self.schema = schema self.schema = schema
self.handler = handler self.handler = handler
self.concurrency = concurrency
def add(self, flow, processor, definition): def add(self, flow, processor, definition):
@ -24,6 +25,7 @@ class ConsumerSpec(Spec):
schema = self.schema, schema = self.schema,
handler = self.handler, handler = self.handler,
metrics = consumer_metrics, metrics = consumer_metrics,
concurrency = self.concurrency
) )
# Consumer handle gets access to producers and other # Consumer handle gets access to producers and other

View file

@ -11,9 +11,13 @@ from .. exceptions import TooManyRequests
from .. base import FlowProcessor, ConsumerSpec, ProducerSpec from .. base import FlowProcessor, ConsumerSpec, ProducerSpec
default_ident = "text-completion" default_ident = "text-completion"
default_concurrency = 1
class LlmResult: class LlmResult:
def __init__(self, text=None, in_token=None, out_token=None, model=None): def __init__(
self, text = None, in_token = None, out_token = None,
model = None,
):
self.text = text self.text = text
self.in_token = in_token self.in_token = in_token
self.out_token = out_token self.out_token = out_token
@ -25,14 +29,19 @@ class LlmService(FlowProcessor):
def __init__(self, **params): def __init__(self, **params):
id = params.get("id") id = params.get("id")
concurrency = params.get("concurrency", 1)
super(LlmService, self).__init__(**params | { "id": id }) super(LlmService, self).__init__(**params | {
"id": id,
"concurrency": concurrency,
})
self.register_specification( self.register_specification(
ConsumerSpec( ConsumerSpec(
name = "request", name = "request",
schema = TextCompletionRequest, schema = TextCompletionRequest,
handler = self.on_request handler = self.on_request,
concurrency = concurrency,
) )
) )
@ -115,5 +124,12 @@ class LlmService(FlowProcessor):
@staticmethod @staticmethod
def add_args(parser): def add_args(parser):
parser.add_argument(
'-c', '--concurrency',
type=int,
default=default_concurrency,
help=f'LLM max output tokens (default: {default_concurrency})'
)
FlowProcessor.add_args(parser) FlowProcessor.add_args(parser)

View file

@ -1,4 +1,8 @@
# Subscriber is similar to consumer: It provides a service to take stuff
# off of a queue and make it available using an internal broker system,
# so suitable for when multiple recipients are reading from the same queue
from pulsar.schema import JsonSchema from pulsar.schema import JsonSchema
import asyncio import asyncio
import _pulsar import _pulsar