trustgraph/trustgraph-utils/scripts/tg-init-pulsar
cybermaggedon 9b91d5eee3
Feature/pkgsplit (#83)
* Starting to spawn base package
* More package hacking
* Bedrock and VertexAI
* Parquet split
* Updated templates
* Utils
2024-09-30 19:36:09 +01:00

119 lines
2.8 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Initialises Pulsar with Trustgraph tenant / namespaces & policy
"""
import requests
import time
import argparse
default_pulsar_admin_url = "http://pulsar:8080"
def get_clusters(url):
print("Get clusters...", flush=True)
resp = requests.get(f"{url}/admin/v2/clusters")
if resp.status_code != 200: raise RuntimeError("Could not fetch clusters")
return resp.json()
def ensure_tenant(url, tenant, clusters):
resp = requests.get(f"{url}/admin/v2/tenants/{tenant}")
if resp.status_code == 200:
print(f"Tenant {tenant} already exists.", flush=True)
return
resp = requests.put(
f"{url}/admin/v2/tenants/{tenant}",
json={
"adminRoles": [],
"allowedClusters": clusters,
}
)
if resp.status_code != 204:
print(resp.text, flush=True)
raise RuntimeError("Tenant creation failed.")
print(f"Tenant {tenant} created.", flush=True)
def ensure_namespace(url, tenant, namespace, config):
resp = requests.get(f"{url}/admin/v2/namespaces/{tenant}/{namespace}")
if resp.status_code == 200:
print(f"Namespace {tenant}/{namespace} already exists.", flush=True)
return
resp = requests.put(
f"{url}/admin/v2/namespaces/{tenant}/{namespace}",
json=config,
)
if resp.status_code != 204:
print(resp.status_code, flush=True)
print(resp.text, flush=True)
raise RuntimeError(f"Namespace {tenant}/{namespace} creation failed.")
print(f"Namespace {tenant}/{namespace} created.", flush=True)
def init(url, tenant="tg"):
clusters = get_clusters(url)
ensure_tenant(url, tenant, clusters)
ensure_namespace(url, tenant, "flow", {})
ensure_namespace(url, tenant, "request", {})
ensure_namespace(url, tenant, "response", {
"retention_policies": {
"retentionSizeInMB": -1,
"retentionTimeInMinutes": 3,
}
})
def main():
parser = argparse.ArgumentParser(
prog='tg-init-pulsar',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-admin-url',
default=default_pulsar_admin_url,
help=f'Pulsar admin URL (default: {default_pulsar_admin_url})',
)
args = parser.parse_args()
while True:
try:
print(flush=True)
print(
f"Initialising with Pulsar {args.pulsar_admin_url}...",
flush=True
)
init(args.pulsar_admin_url, "tg")
print("Initialisation complete.", flush=True)
break
except Exception as e:
print("Exception:", e, flush=True)
print("Sleeping...", flush=True)
time.sleep(2)
print("Will retry...", flush=True)
main()