2024-12-02 17:41:30 +00:00
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
from pulsar.schema import JsonSchema
|
|
|
|
|
import asyncio
|
|
|
|
|
import _pulsar
|
2024-12-03 18:03:00 +00:00
|
|
|
import time
|
2024-12-02 17:41:30 +00:00
|
|
|
|
|
|
|
|
class Subscriber:
|
|
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
def __init__(self, client, topic, subscription, consumer_name,
|
2025-04-24 18:57:33 +01:00
|
|
|
schema=None, max_size=100, metrics=None):
|
2025-04-22 20:21:38 +01:00
|
|
|
self.client = client
|
2024-12-02 17:41:30 +00:00
|
|
|
self.topic = topic
|
|
|
|
|
self.subscription = subscription
|
|
|
|
|
self.consumer_name = consumer_name
|
|
|
|
|
self.schema = schema
|
|
|
|
|
self.q = {}
|
|
|
|
|
self.full = {}
|
2024-12-03 18:03:00 +00:00
|
|
|
self.max_size = max_size
|
2025-04-22 20:21:38 +01:00
|
|
|
self.lock = asyncio.Lock()
|
2025-02-11 16:01:03 +00:00
|
|
|
self.running = True
|
2025-04-24 18:57:33 +01:00
|
|
|
self.metrics = metrics
|
2024-12-03 18:03:00 +00:00
|
|
|
|
2025-04-29 00:06:41 +01:00
|
|
|
def __del__(self):
|
2025-04-22 20:21:38 +01:00
|
|
|
self.running = False
|
|
|
|
|
|
|
|
|
|
async def start(self):
|
|
|
|
|
self.task = asyncio.create_task(self.run())
|
2024-12-03 18:03:00 +00:00
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
async def stop(self):
|
2025-02-11 16:01:03 +00:00
|
|
|
self.running = False
|
2025-04-29 23:34:41 +01:00
|
|
|
await self.task
|
2025-02-11 16:01:03 +00:00
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
async def join(self):
|
|
|
|
|
await self.stop()
|
|
|
|
|
await self.task
|
2025-02-11 16:01:03 +00:00
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
async def run(self):
|
2024-12-02 17:41:30 +00:00
|
|
|
|
2025-04-29 23:34:41 +01:00
|
|
|
consumer = None
|
|
|
|
|
|
2025-02-11 16:01:03 +00:00
|
|
|
while self.running:
|
2024-12-03 18:03:00 +00:00
|
|
|
|
2025-04-24 18:57:33 +01:00
|
|
|
if self.metrics:
|
|
|
|
|
self.metrics.state("stopped")
|
|
|
|
|
|
2024-12-02 17:41:30 +00:00
|
|
|
try:
|
2024-12-03 18:03:00 +00:00
|
|
|
|
2025-02-12 23:39:24 +00:00
|
|
|
consumer = self.client.subscribe(
|
2025-04-22 20:21:38 +01:00
|
|
|
topic = self.topic,
|
|
|
|
|
subscription_name = self.subscription,
|
|
|
|
|
consumer_name = self.consumer_name,
|
|
|
|
|
schema = JsonSchema(self.schema),
|
2024-12-03 18:03:00 +00:00
|
|
|
)
|
|
|
|
|
|
2025-04-24 18:57:33 +01:00
|
|
|
if self.metrics:
|
|
|
|
|
self.metrics.state("running")
|
|
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
print("Subscriber running...", flush=True)
|
|
|
|
|
|
2025-02-11 16:01:03 +00:00
|
|
|
while self.running:
|
2024-12-03 18:03:00 +00:00
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
try:
|
|
|
|
|
msg = await asyncio.to_thread(
|
|
|
|
|
consumer.receive,
|
2025-04-29 23:34:41 +01:00
|
|
|
timeout_millis=250
|
2025-04-22 20:21:38 +01:00
|
|
|
)
|
|
|
|
|
except _pulsar.Timeout:
|
|
|
|
|
continue
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print("Exception:", e, flush=True)
|
|
|
|
|
print(type(e))
|
|
|
|
|
raise e
|
2024-12-03 18:03:00 +00:00
|
|
|
|
2025-04-24 18:57:33 +01:00
|
|
|
if self.metrics:
|
|
|
|
|
self.metrics.received()
|
|
|
|
|
|
2024-12-03 18:03:00 +00:00
|
|
|
# Acknowledge successful reception of the message
|
|
|
|
|
consumer.acknowledge(msg)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
id = msg.properties()["id"]
|
|
|
|
|
except:
|
|
|
|
|
id = None
|
2024-12-02 17:41:30 +00:00
|
|
|
|
2024-12-03 18:03:00 +00:00
|
|
|
value = msg.value()
|
2024-12-02 17:41:30 +00:00
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
async with self.lock:
|
|
|
|
|
|
|
|
|
|
# FIXME: Hard-coded timeouts
|
2024-12-02 17:41:30 +00:00
|
|
|
|
2024-12-03 14:13:40 +00:00
|
|
|
if id in self.q:
|
2025-04-22 20:21:38 +01:00
|
|
|
|
2024-12-03 18:03:00 +00:00
|
|
|
try:
|
2025-02-11 16:01:03 +00:00
|
|
|
# FIXME: Timeout means data goes missing
|
2025-04-22 20:21:38 +01:00
|
|
|
await asyncio.wait_for(
|
|
|
|
|
self.q[id].put(value),
|
2025-04-29 23:34:41 +01:00
|
|
|
timeout=1
|
2025-04-22 20:21:38 +01:00
|
|
|
)
|
2025-04-24 18:57:33 +01:00
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
except Exception as e:
|
2025-04-24 18:57:33 +01:00
|
|
|
self.metrics.dropped()
|
2025-04-22 20:21:38 +01:00
|
|
|
print("Q Put:", e, flush=True)
|
2024-12-02 17:41:30 +00:00
|
|
|
|
2024-12-03 14:13:40 +00:00
|
|
|
for q in self.full.values():
|
2024-12-03 18:03:00 +00:00
|
|
|
try:
|
2025-02-11 16:01:03 +00:00
|
|
|
# FIXME: Timeout means data goes missing
|
2025-04-22 20:21:38 +01:00
|
|
|
await asyncio.wait_for(
|
|
|
|
|
q.put(value),
|
2025-04-29 23:34:41 +01:00
|
|
|
timeout=1
|
2025-04-22 20:21:38 +01:00
|
|
|
)
|
|
|
|
|
except Exception as e:
|
2025-04-24 18:57:33 +01:00
|
|
|
self.metrics.dropped()
|
2025-04-22 20:21:38 +01:00
|
|
|
print("Q Put:", e, flush=True)
|
2024-12-02 17:41:30 +00:00
|
|
|
|
|
|
|
|
except Exception as e:
|
2025-04-22 20:21:38 +01:00
|
|
|
print("Subscriber exception:", e, flush=True)
|
|
|
|
|
|
2025-04-29 23:34:41 +01:00
|
|
|
finally:
|
|
|
|
|
|
|
|
|
|
if consumer:
|
|
|
|
|
consumer.close()
|
|
|
|
|
consumer = None
|
|
|
|
|
|
2024-12-02 17:41:30 +00:00
|
|
|
|
2025-04-24 18:57:33 +01:00
|
|
|
if self.metrics:
|
|
|
|
|
self.metrics.state("stopped")
|
|
|
|
|
|
2025-04-29 23:34:41 +01:00
|
|
|
if not self.running:
|
|
|
|
|
return
|
|
|
|
|
|
2024-12-02 17:41:30 +00:00
|
|
|
# If handler drops out, sleep a retry
|
2025-04-29 23:34:41 +01:00
|
|
|
await asyncio.sleep(1)
|
2024-12-03 18:03:00 +00:00
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
async def subscribe(self, id):
|
2024-12-03 18:03:00 +00:00
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
async with self.lock:
|
2024-12-03 18:03:00 +00:00
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
q = asyncio.Queue(maxsize=self.max_size)
|
2024-12-03 18:03:00 +00:00
|
|
|
self.q[id] = q
|
2024-12-02 17:41:30 +00:00
|
|
|
|
|
|
|
|
return q
|
|
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
async def unsubscribe(self, id):
|
2024-12-03 18:03:00 +00:00
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
async with self.lock:
|
2024-12-03 18:03:00 +00:00
|
|
|
|
|
|
|
|
if id in self.q:
|
|
|
|
|
# self.q[id].shutdown(immediate=True)
|
|
|
|
|
del self.q[id]
|
2024-12-02 17:41:30 +00:00
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
async def subscribe_all(self, id):
|
2024-12-03 18:03:00 +00:00
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
async with self.lock:
|
2024-12-03 18:03:00 +00:00
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
q = asyncio.Queue(maxsize=self.max_size)
|
2024-12-03 18:03:00 +00:00
|
|
|
self.full[id] = q
|
|
|
|
|
|
2024-12-02 17:41:30 +00:00
|
|
|
return q
|
|
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
async def unsubscribe_all(self, id):
|
2024-12-03 18:03:00 +00:00
|
|
|
|
2025-04-22 20:21:38 +01:00
|
|
|
async with self.lock:
|
2024-12-03 18:03:00 +00:00
|
|
|
|
|
|
|
|
if id in self.full:
|
|
|
|
|
# self.full[id].shutdown(immediate=True)
|
|
|
|
|
del self.full[id]
|
2024-12-02 17:41:30 +00:00
|
|
|
|