#!/usr/bin/env python3 """ Initialises Pulsar with Trustgraph tenant / namespaces & policy """ import requests import time import argparse default_pulsar_admin_url = "http://pulsar:8080" def get_clusters(url): print("Get clusters...", flush=True) resp = requests.get(f"{url}/admin/v2/clusters") if resp.status_code != 200: raise RuntimeError("Could not fetch clusters") return resp.json() def ensure_tenant(url, tenant, clusters): resp = requests.get(f"{url}/admin/v2/tenants/{tenant}") if resp.status_code == 200: print(f"Tenant {tenant} already exists.", flush=True) return resp = requests.put( f"{url}/admin/v2/tenants/{tenant}", json={ "adminRoles": [], "allowedClusters": clusters, } ) if resp.status_code != 204: print(resp.text, flush=True) raise RuntimeError("Tenant creation failed.") print(f"Tenant {tenant} created.", flush=True) def ensure_namespace(url, tenant, namespace, config): resp = requests.get(f"{url}/admin/v2/namespaces/{tenant}/{namespace}") if resp.status_code == 200: print(f"Namespace {tenant}/{namespace} already exists.", flush=True) return resp = requests.put( f"{url}/admin/v2/namespaces/{tenant}/{namespace}", json=config, ) if resp.status_code != 204: print(resp.status_code, flush=True) print(resp.text, flush=True) raise RuntimeError(f"Namespace {tenant}/{namespace} creation failed.") print(f"Namespace {tenant}/{namespace} created.", flush=True) def init(url, tenant="tg"): clusters = get_clusters(url) ensure_tenant(url, tenant, clusters) ensure_namespace(url, tenant, "flow", {}) ensure_namespace(url, tenant, "request", {}) ensure_namespace(url, tenant, "response", { "retention_policies": { "retentionSizeInMB": -1, "retentionTimeInMinutes": 3, } }) def main(): parser = argparse.ArgumentParser( prog='tg-init-pulsar', description=__doc__, ) parser.add_argument( '-p', '--pulsar-admin-url', default=default_pulsar_admin_url, help=f'Pulsar admin URL (default: {default_pulsar_admin_url})', ) args = parser.parse_args() while True: try: print(flush=True) print( f"Initialising with Pulsar {args.pulsar_admin_url}...", flush=True ) init(args.pulsar_admin_url, "tg") print("Initialisation complete.", flush=True) break except Exception as e: print("Exception:", e, flush=True) print("Sleeping...", flush=True) time.sleep(2) print("Will retry...", flush=True) main()