# Consumer is similar to subscriber: It takes information from a queue # and passes on to a processor function. This is the main receiving # loop for TrustGraph processors. Incorporates retry functionality # Note: there is a 'defect' in the system which is tolerated, althought # the processing handlers are async functions, ideally implementation # would use all async code. In practice if the processor only implements # one handler, and a single thread of concurrency, nothing too outrageous # will happen if synchronous / blocking code is used import asyncio import time import logging from concurrent.futures import ThreadPoolExecutor from .. exceptions import TooManyRequests # Module logger logger = logging.getLogger(__name__) # Timeout exception - can come from different backends class TimeoutError(Exception): pass class Consumer: def __init__( self, taskgroup, flow, backend, topic, subscriber, schema, handler, metrics = None, start_of_messages=False, rate_limit_retry_time = 10, rate_limit_timeout = 7200, reconnect_time = 5, concurrency = 1, # Number of concurrent requests to handle **kwargs, ): self.taskgroup = taskgroup self.flow = flow self.backend = backend self.topic = topic self.subscriber = subscriber self.schema = schema self.handler = handler self.rate_limit_retry_time = rate_limit_retry_time self.rate_limit_timeout = rate_limit_timeout self.reconnect_time = 5 self.start_of_messages = start_of_messages self.running = True self.consumer_task = None self.concurrency = concurrency self.metrics = metrics self.consumer = None def __del__(self): self.running = False if hasattr(self, "consumer"): if self.consumer: self.consumer.unsubscribe() self.consumer.close() self.consumer = None async def stop(self): self.running = False if self.consumer_task: await self.consumer_task self.consumer_task = None async def start(self): self.running = True # Puts it in the stopped state, the run thread should set running if self.metrics: self.metrics.state("stopped") self.consumer_task = self.taskgroup.create_task(self.consumer_run()) async def consumer_run(self): while self.running: if self.metrics: self.metrics.state("stopped") # Determine initial position if self.start_of_messages: initial_pos = 'earliest' else: initial_pos = 'latest' if self.metrics: self.metrics.state("running") try: logger.info(f"Starting {self.concurrency} receiver threads") # Create one backend consumer per concurrent task. # Each gets its own connection and dedicated thread — # required for backends like RabbitMQ where connections # are not thread-safe (pika BlockingConnection must be # used from a single thread). consumers = [] executors = [] for i in range(self.concurrency): try: logger.info(f"Subscribing to topic: {self.topic} (worker {i})") executor = ThreadPoolExecutor(max_workers=1) loop = asyncio.get_event_loop() c = await loop.run_in_executor( executor, lambda: self.backend.create_consumer( topic = self.topic, subscription = self.subscriber, schema = self.schema, initial_position = initial_pos, ), ) consumers.append(c) executors.append(executor) logger.info(f"Successfully subscribed to topic: {self.topic} (worker {i})") except Exception as e: logger.error(f"Consumer subscription exception (worker {i}): {e}", exc_info=True) raise async with asyncio.TaskGroup() as tg: for c, ex in zip(consumers, executors): tg.create_task(self.consume_from_queue(c, ex)) if self.metrics: self.metrics.state("stopped") except Exception as e: logger.error(f"Consumer loop exception: {e}", exc_info=True) for c in consumers: try: c.unsubscribe() c.close() except Exception: pass for ex in executors: ex.shutdown(wait=False) consumers = [] executors = [] await asyncio.sleep(self.reconnect_time) continue finally: for c in consumers: try: c.unsubscribe() c.close() except Exception: pass for ex in executors: ex.shutdown(wait=False) async def consume_from_queue(self, consumer, executor=None): loop = asyncio.get_event_loop() while self.running: try: msg = await loop.run_in_executor( executor, lambda: consumer.receive(timeout_millis=100), ) except Exception as e: # Handle timeout from any backend if 'timeout' in str(type(e)).lower() or 'timeout' in str(e).lower(): continue raise e await self.handle_one_from_queue(msg, consumer, executor) async def handle_one_from_queue(self, msg, consumer, executor=None): loop = asyncio.get_event_loop() expiry = time.time() + self.rate_limit_timeout # This loop is for retry on rate-limit / resource limits while self.running: if time.time() > expiry: logger.warning("Gave up waiting for rate-limit retry") # Message failed to be processed, this causes it to # be retried. Ack on the consumer's dedicated thread # (pika is not thread-safe). await loop.run_in_executor( executor, lambda: consumer.negative_acknowledge(msg) ) if self.metrics: self.metrics.process("error") # Break out of retry loop, processes next message break try: logger.debug("Processing message...") if self.metrics: with self.metrics.record_time(): await self.handler(msg, self, self.flow) else: await self.handler(msg, self, self.flow) logger.debug("Message processed successfully") # Acknowledge on the consumer's dedicated thread # (pika is not thread-safe) await loop.run_in_executor( executor, lambda: consumer.acknowledge(msg) ) if self.metrics: self.metrics.process("success") # Break out of retry loop break except TooManyRequests: logger.warning("Rate limit exceeded, will retry...") if self.metrics: self.metrics.rate_limit() # Sleep await asyncio.sleep(self.rate_limit_retry_time) # Contine from retry loop, just causes a reprocessing continue except Exception as e: logger.error(f"Message processing exception: {e}", exc_info=True) # Message failed to be processed, this causes it to # be retried. Ack on the consumer's dedicated thread. await loop.run_in_executor( executor, lambda: consumer.negative_acknowledge(msg) ) if self.metrics: self.metrics.process("error") # Break out of retry loop, processes next message break