diff --git a/trustgraph-base/trustgraph/base/base_processor.py b/trustgraph-base/trustgraph/base/base_processor.py index a8374538..35551b4f 100644 --- a/trustgraph-base/trustgraph/base/base_processor.py +++ b/trustgraph-base/trustgraph/base/base_processor.py @@ -3,12 +3,18 @@ import asyncio import os import argparse import pulsar +from pulsar.schema import JsonSchema import _pulsar import time +import uuid from prometheus_client import start_http_server, Info +from .. schema import ConfigPush, config_push_queue from .. log_level import LogLevel +default_config_queue = config_push_queue +config_subscriber_id = str(uuid.uuid4()) + class BaseProcessor: default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') @@ -34,6 +40,11 @@ class BaseProcessor: pulsar_api_key = params.get("pulsar_api_key", None) log_level = params.get("log_level", LogLevel.INFO) + self.config_push_queue = params.get( + "config_push_queue", + default_config_queue + ) + self.pulsar_host = pulsar_host self.pulsar_api_key = pulsar_api_key @@ -53,6 +64,12 @@ class BaseProcessor: self.pulsar_listener = pulsar_listener + self.config_subscriber = self.client.subscribe( + self.config_push_queue, config_subscriber_id, + consumer_type=pulsar.ConsumerType.Shared, + schema=JsonSchema(ConfigPush), + ) + def __del__(self): if hasattr(self, "client"): @@ -74,6 +91,12 @@ class BaseProcessor: help=f'Pulsar API key', ) + parser.add_argument( + '--config-push-queue', + default=default_config_queue, + help=f'Config push queue {default_config_queue}', + ) + parser.add_argument( '--pulsar-listener', help=f'Pulsar listener (default: none)', @@ -104,14 +127,44 @@ class BaseProcessor: async def start(self): pass + async def run_config_queue(self): + + if self.module == "config.service": + print("I am config-svc, not looking at config queue", flush=True) + return + + print("Config thread running", flush=True) + + while True: + + try: + msg = await asyncio.to_thread( + self.config_subscriber.receive, timeout_millis=2000 + ) + except pulsar.Timeout: + continue + + v = msg.value() + print("Got config version", v.version, flush=True) + + await self.on_config(v.version, v.config) + + async def on_config(self, version, config): + pass + async def run(self): raise RuntimeError("Something should have implemented the run method") @classmethod - async def launch_async(cls, args): + async def launch_async(cls, args, prog): p = cls(**args) + p.module = prog await p.start() - await p.run() + + task1 = asyncio.create_task(p.run_config_queue()) + task2 = asyncio.create_task(p.run()) + + await asyncio.gather(task1, task2) @classmethod def launch(cls, prog, doc): @@ -135,7 +188,7 @@ class BaseProcessor: try: - asyncio.run(cls.launch_async(args)) + asyncio.run(cls.launch_async(args, prog)) except KeyboardInterrupt: print("Keyboard interrupt.") diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index 175f1fd7..fdbe5531 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -82,7 +82,7 @@ class Consumer(BaseProcessor): while True: - msg = self.consumer.receive() + msg = await asyncio.to_thread(self.consumer.receive) expiry = time.time() + self.rate_limit_timeout diff --git a/trustgraph-cli/scripts/tg-init-pulsar b/trustgraph-cli/scripts/tg-init-pulsar index 98b5072f..69a13411 100755 --- a/trustgraph-cli/scripts/tg-init-pulsar +++ b/trustgraph-cli/scripts/tg-init-pulsar @@ -76,13 +76,15 @@ def init(url, tenant="tg"): "retention_policies": { "retentionSizeInMB": -1, "retentionTimeInMinutes": 3, + "subscription_expiration_time_minutes": 30, } }) ensure_namespace(url, tenant, "config", { "retention_policies": { - "retentionSizeInMB": 50, + "retentionSizeInMB": 10, "retentionTimeInMinutes": -1, + "subscription_expiration_time_minutes": 5, } })