Added config reload handler mechanism, calls a Python method on config (#334)

This commit is contained in:
cybermaggedon 2025-04-02 00:23:30 +01:00 committed by GitHub
parent 88eae0a9f0
commit a2c64cad4a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 60 additions and 5 deletions

View file

@ -3,12 +3,18 @@ import asyncio
import os
import argparse
import pulsar
from pulsar.schema import JsonSchema
import _pulsar
import time
import uuid
from prometheus_client import start_http_server, Info
from .. schema import ConfigPush, config_push_queue
from .. log_level import LogLevel
default_config_queue = config_push_queue
config_subscriber_id = str(uuid.uuid4())
class BaseProcessor:
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
@ -34,6 +40,11 @@ class BaseProcessor:
pulsar_api_key = params.get("pulsar_api_key", None)
log_level = params.get("log_level", LogLevel.INFO)
self.config_push_queue = params.get(
"config_push_queue",
default_config_queue
)
self.pulsar_host = pulsar_host
self.pulsar_api_key = pulsar_api_key
@ -53,6 +64,12 @@ class BaseProcessor:
self.pulsar_listener = pulsar_listener
self.config_subscriber = self.client.subscribe(
self.config_push_queue, config_subscriber_id,
consumer_type=pulsar.ConsumerType.Shared,
schema=JsonSchema(ConfigPush),
)
def __del__(self):
if hasattr(self, "client"):
@ -74,6 +91,12 @@ class BaseProcessor:
help=f'Pulsar API key',
)
parser.add_argument(
'--config-push-queue',
default=default_config_queue,
help=f'Config push queue {default_config_queue}',
)
parser.add_argument(
'--pulsar-listener',
help=f'Pulsar listener (default: none)',
@ -104,14 +127,44 @@ class BaseProcessor:
async def start(self):
pass
async def run_config_queue(self):
if self.module == "config.service":
print("I am config-svc, not looking at config queue", flush=True)
return
print("Config thread running", flush=True)
while True:
try:
msg = await asyncio.to_thread(
self.config_subscriber.receive, timeout_millis=2000
)
except pulsar.Timeout:
continue
v = msg.value()
print("Got config version", v.version, flush=True)
await self.on_config(v.version, v.config)
async def on_config(self, version, config):
pass
async def run(self):
raise RuntimeError("Something should have implemented the run method")
@classmethod
async def launch_async(cls, args):
async def launch_async(cls, args, prog):
p = cls(**args)
p.module = prog
await p.start()
await p.run()
task1 = asyncio.create_task(p.run_config_queue())
task2 = asyncio.create_task(p.run())
await asyncio.gather(task1, task2)
@classmethod
def launch(cls, prog, doc):
@ -135,7 +188,7 @@ class BaseProcessor:
try:
asyncio.run(cls.launch_async(args))
asyncio.run(cls.launch_async(args, prog))
except KeyboardInterrupt:
print("Keyboard interrupt.")

View file

@ -82,7 +82,7 @@ class Consumer(BaseProcessor):
while True:
msg = self.consumer.receive()
msg = await asyncio.to_thread(self.consumer.receive)
expiry = time.time() + self.rate_limit_timeout

View file

@ -76,13 +76,15 @@ def init(url, tenant="tg"):
"retention_policies": {
"retentionSizeInMB": -1,
"retentionTimeInMinutes": 3,
"subscription_expiration_time_minutes": 30,
}
})
ensure_namespace(url, tenant, "config", {
"retention_policies": {
"retentionSizeInMB": 50,
"retentionSizeInMB": 10,
"retentionTimeInMinutes": -1,
"subscription_expiration_time_minutes": 5,
}
})