Config & Pulsar bootstrap

This commit is contained in:
Cyber MacGeddon 2026-04-22 09:19:40 +01:00
parent dac3361111
commit 42226729bc
8 changed files with 670 additions and 0 deletions

View file

@ -61,7 +61,9 @@ api-gateway = "trustgraph.gateway:run"
chunker-recursive = "trustgraph.chunking.recursive:run"
chunker-token = "trustgraph.chunking.token:run"
config-svc = "trustgraph.config.service:run"
config-bootstrap = "trustgraph.bootstrap.config_bootstrap:run"
flow-svc = "trustgraph.flow.service:run"
pulsar-bootstrap = "trustgraph.bootstrap.pulsar_bootstrap:run"
doc-embeddings-query-milvus = "trustgraph.query.doc_embeddings.milvus:run"
doc-embeddings-query-pinecone = "trustgraph.query.doc_embeddings.pinecone:run"
doc-embeddings-query-qdrant = "trustgraph.query.doc_embeddings.qdrant:run"

View file

@ -0,0 +1 @@
from . service import *

View file

@ -0,0 +1,6 @@
#!/usr/bin/env python3
from . service import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,430 @@
"""
Config bootstrap service.
Runs a slow reconciliation loop that:
1. Seeds the ``__template__`` workspace from an external JSON seed
file on first boot. The seed file has the same shape as
``tg-init-trustgraph``'s input: a top-level dict of
``{ config-type: { config-key: value } }`` where values are the
raw (JSON-serialisable) payloads to store.
2. On first boot, copies ``__template__`` into a normal workspace
(default: ``default``) so a single-tenant system has a working
workspace without manual provisioning.
3. Optionally starts a default flow in that workspace using a
specified blueprint.
Each of these three steps is gated on a durable marker stored in the
reserved ``__system__`` workspace. Markers are set on successful
completion of their own step; once set they are never touched again,
so the loop becomes a cheap no-op after first boot. Markers are also
independent: a partial failure (e.g. default workspace seeded but
flow-start failed) is resumable on next wake.
Reserved workspaces ``__template__`` and ``__system__`` start with an
underscore and so are excluded from config push notifications no
live service sees their contents.
Cadence:
- CYCLE_INTERVAL (300s) on success / steady state
- CONNECT_BACKOFF (10s) on connection failure to config-svc
"""
import argparse
import asyncio
import json
import logging
import uuid
from argparse import ArgumentParser
from trustgraph.base import AsyncProcessor
from trustgraph.base import ProducerMetrics, SubscriberMetrics
from trustgraph.base.config_client import ConfigClient
from trustgraph.schema import (
ConfigRequest, ConfigResponse,
config_request_queue, config_response_queue,
)
from trustgraph.schema import (
FlowRequest, FlowResponse,
flow_request_queue, flow_response_queue,
)
from trustgraph.base.request_response_spec import RequestResponse
logger = logging.getLogger(__name__)
default_ident = "config-bootstrap"
# Reserved workspaces
TEMPLATE_WORKSPACE = "__template__"
SYSTEM_WORKSPACE = "__system__"
# System marker type + keys
BOOTSTRAP_MARKER_TYPE = "bootstrap"
TEMPLATE_SEEDED_KEY = "template-seeded"
DEFAULT_WORKSPACE_SEEDED_KEY = "default-workspace-seeded"
DEFAULT_FLOW_STARTED_KEY = "default-flow-started"
# Loop cadence
CYCLE_INTERVAL = 300
CONNECT_BACKOFF = 10
default_default_workspace = "default"
default_default_flow_id = "default"
default_default_flow_description = "Default"
class Processor(AsyncProcessor):
def __init__(self, **params):
super().__init__(**params)
self.config_file = params.get("config_file")
self.default_workspace = params.get(
"default_workspace", default_default_workspace,
)
self.default_flow_id = params.get(
"default_flow_id", default_default_flow_id,
)
self.default_flow_blueprint = params.get(
"default_flow_blueprint",
)
self.default_flow_description = params.get(
"default_flow_description", default_default_flow_description,
)
# Optional flow parameters as JSON string
default_flow_parameters_raw = params.get("default_flow_parameters")
if default_flow_parameters_raw:
try:
self.default_flow_parameters = json.loads(
default_flow_parameters_raw
)
except Exception as e:
raise RuntimeError(
f"Invalid --default-flow-parameters JSON: {e}"
)
else:
self.default_flow_parameters = {}
if self.config_file is None:
raise RuntimeError(
"config_file is required (--config-file)"
)
logger.info(
f"Config bootstrap: seed file={self.config_file} "
f"default_workspace={self.default_workspace} "
f"default_flow_id={self.default_flow_id} "
f"default_flow_blueprint={self.default_flow_blueprint or '(none)'}"
)
# ------------------------------------------------------------------
# Helpers for talking to config-svc and flow-svc via req/resp.
# ------------------------------------------------------------------
def _make_config_client(self):
"""Short-lived ConfigClient over the shared pubsub backend."""
rr_id = str(uuid.uuid4())
req_metrics = ProducerMetrics(
processor=self.id, flow=None, name="config-request",
)
resp_metrics = SubscriberMetrics(
processor=self.id, flow=None, name="config-response",
)
return ConfigClient(
backend=self.pubsub_backend,
subscription=f"{self.id}--config--{rr_id}",
consumer_name=self.id,
request_topic=config_request_queue,
request_schema=ConfigRequest,
request_metrics=req_metrics,
response_topic=config_response_queue,
response_schema=ConfigResponse,
response_metrics=resp_metrics,
)
def _make_flow_client(self):
"""Short-lived RequestResponse client for flow-svc."""
rr_id = str(uuid.uuid4())
req_metrics = ProducerMetrics(
processor=self.id, flow=None, name="flow-request",
)
resp_metrics = SubscriberMetrics(
processor=self.id, flow=None, name="flow-response",
)
return RequestResponse(
backend=self.pubsub_backend,
subscription=f"{self.id}--flow--{rr_id}",
consumer_name=self.id,
request_topic=flow_request_queue,
request_schema=FlowRequest,
request_metrics=req_metrics,
response_topic=flow_response_queue,
response_schema=FlowResponse,
response_metrics=resp_metrics,
)
# ------------------------------------------------------------------
# Marker helpers. Markers live in __system__/bootstrap/<key>.
# ------------------------------------------------------------------
async def _marker_is_set(self, config, key):
value = await config.get(
SYSTEM_WORKSPACE, BOOTSTRAP_MARKER_TYPE, key,
)
return value is not None
async def _marker_set(self, config, key):
await config.put(
SYSTEM_WORKSPACE, BOOTSTRAP_MARKER_TYPE, key,
json.dumps(True),
)
# ------------------------------------------------------------------
# Seed data.
# ------------------------------------------------------------------
def _load_seed(self):
"""Read the seed JSON file from disk. Returns
``{type: {key: raw-value}}``."""
with open(self.config_file) as f:
return json.load(f)
async def _write_config_tree(self, config, workspace, tree):
"""Write every (type, key, value) pair from a seed tree into
the given workspace as a single batched put."""
values = []
for type_name, entries in tree.items():
for key, value in entries.items():
values.append(
(type_name, key, json.dumps(value))
)
if values:
await config.put_many(workspace, values)
# ------------------------------------------------------------------
# Reconciliation steps.
# ------------------------------------------------------------------
async def _seed_template(self, config):
"""Step 1: populate __template__ from the seed file if not
already seeded."""
if await self._marker_is_set(config, TEMPLATE_SEEDED_KEY):
logger.debug("Template already seeded.")
return False
logger.info(
f"Seeding __template__ workspace from {self.config_file}"
)
seed = self._load_seed()
await self._write_config_tree(config, TEMPLATE_WORKSPACE, seed)
await self._marker_set(config, TEMPLATE_SEEDED_KEY)
logger.info("Template seeded.")
return True
async def _bootstrap_default_workspace(self, config):
"""Step 2: copy __template__ content into the default
workspace on first boot."""
if await self._marker_is_set(
config, DEFAULT_WORKSPACE_SEEDED_KEY,
):
logger.debug("Default workspace already bootstrapped.")
return False
logger.info(
f"Bootstrapping default workspace '{self.default_workspace}' "
f"from __template__"
)
# Re-read the seed file so the default workspace is populated
# from the same source of truth as __template__. We do not
# copy from __template__ via the config service because
# __template__ may have been edited after seeding; the seed
# file is the factory default.
seed = self._load_seed()
await self._write_config_tree(
config, self.default_workspace, seed,
)
await self._marker_set(config, DEFAULT_WORKSPACE_SEEDED_KEY)
logger.info(
f"Default workspace '{self.default_workspace}' bootstrapped."
)
return True
async def _start_default_flow(self, config):
"""Step 3: start a default flow if a blueprint was supplied
and the flow has not already been started."""
if self.default_flow_blueprint is None:
logger.debug(
"No default-flow-blueprint configured, skipping."
)
return False
if await self._marker_is_set(
config, DEFAULT_FLOW_STARTED_KEY,
):
logger.debug("Default flow already started.")
return False
logger.info(
f"Starting default flow "
f"'{self.default_flow_id}' "
f"(blueprint '{self.default_flow_blueprint}') "
f"in workspace '{self.default_workspace}'"
)
flow_client = self._make_flow_client()
try:
await flow_client.start()
req = FlowRequest(
operation="start-flow",
workspace=self.default_workspace,
flow_id=self.default_flow_id,
blueprint_name=self.default_flow_blueprint,
description=self.default_flow_description,
parameters=self.default_flow_parameters,
)
resp = await flow_client.request(req, timeout=30)
if resp.error:
raise RuntimeError(
f"start-flow failed: "
f"{resp.error.type}: {resp.error.message}"
)
finally:
try:
await flow_client.stop()
except Exception:
pass
await self._marker_set(config, DEFAULT_FLOW_STARTED_KEY)
logger.info("Default flow started.")
return True
async def _reconcile_once(self):
"""Run all three steps in order; each is a cheap no-op once
its marker is set."""
config = self._make_config_client()
try:
await config.start()
await self._seed_template(config)
await self._bootstrap_default_workspace(config)
await self._start_default_flow(config)
finally:
try:
await config.stop()
except Exception:
pass
# ------------------------------------------------------------------
# Main loop — the processor-group / standalone entry point.
# ------------------------------------------------------------------
async def run(self):
logger.info("Config bootstrap loop starting.")
while self.running:
try:
await self._reconcile_once()
except (ConnectionError, TimeoutError, OSError) as e:
logger.info(
f"Bootstrap: connect/timeout error "
f"({type(e).__name__}: {e}); retry in "
f"{CONNECT_BACKOFF}s"
)
await asyncio.sleep(CONNECT_BACKOFF)
continue
except Exception as e:
logger.error(
f"Bootstrap reconcile failed: "
f"{type(e).__name__}: {e}",
exc_info=True,
)
# Treat as transient, fall through to long sleep so
# we don't spin on persistent failures.
await asyncio.sleep(CYCLE_INTERVAL)
# ------------------------------------------------------------------
# CLI arg plumbing.
# ------------------------------------------------------------------
@staticmethod
def add_args(parser: ArgumentParser) -> None:
AsyncProcessor.add_args(parser)
parser.add_argument(
'-C', '--config-file',
required=True,
help='Path to the seed JSON config file '
'({type: {key: value}} shape)',
)
parser.add_argument(
'-w', '--default-workspace',
default=default_default_workspace,
help=f'Name of the default workspace to bootstrap '
f'(default: {default_default_workspace})',
)
parser.add_argument(
'--default-flow-id',
default=default_default_flow_id,
help=f'Flow id to start in the default workspace '
f'(default: {default_default_flow_id})',
)
parser.add_argument(
'--default-flow-blueprint',
default=None,
help='Blueprint to use for the default flow. '
'If not set, no default flow is started.',
)
parser.add_argument(
'--default-flow-description',
default=default_default_flow_description,
help=f'Description for the default flow '
f'(default: "{default_default_flow_description}")',
)
parser.add_argument(
'--default-flow-parameters',
default=None,
help='Optional JSON string of parameters to pass to '
'start-flow (e.g. \'{"llm-model":"..."}\').',
)
def run():
Processor.launch(default_ident, __doc__)

View file

@ -0,0 +1 @@
from . service import *

View file

@ -0,0 +1,6 @@
#!/usr/bin/env python3
from . service import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,224 @@
"""
Pulsar topology bootstrap service.
Runs a slow reconciliation loop that ensures the Pulsar tenant and
namespaces TrustGraph requires are present, creating them if they
aren't. All operations are idempotent, so the loop is a cheap no-op
once the cluster has been initialised.
Only relevant when running on the Pulsar pub/sub backend for
RabbitMQ / Kafka this service can simply be omitted from the
processor group.
Namespaces created:
- ``<tenant>/flow`` data-path topics (durable, no retention limit)
- ``<tenant>/request`` request-class topics
- ``<tenant>/response`` response-class topics (short retention)
- ``<tenant>/notify`` notify topics (very short retention)
This is a thin reconciler around the same admin-API calls
``tg-init-trustgraph`` has always made; what's new is that it lives
in a processor group, keeps retrying on failure, and doesn't need a
dedicated container.
Cadence:
- CYCLE_INTERVAL (600s) on success / steady state
- CONNECT_BACKOFF (10s) on admin-API failure
"""
import asyncio
import logging
from argparse import ArgumentParser
import requests
from trustgraph.base import AsyncProcessor
logger = logging.getLogger(__name__)
default_ident = "pulsar-bootstrap"
default_pulsar_admin_url = "http://pulsar:8080"
default_tenant = "tg"
# Namespace configs. flow/request take broker defaults; response and
# notify have aggressive retention to keep the broker lean (short-lived
# reply and notification traffic only).
NAMESPACE_CONFIG = {
"flow": {},
"request": {},
"response": {
"retention_policies": {
"retentionSizeInMB": -1,
"retentionTimeInMinutes": 3,
"subscriptionExpirationTimeMinutes": 30,
},
},
"notify": {
"retention_policies": {
"retentionSizeInMB": -1,
"retentionTimeInMinutes": 3,
"subscriptionExpirationTimeMinutes": 5,
},
},
}
CYCLE_INTERVAL = 600
CONNECT_BACKOFF = 10
REQUEST_TIMEOUT = 10
class Processor(AsyncProcessor):
def __init__(self, **params):
super().__init__(**params)
self.pulsar_admin_url = params.get(
"pulsar_admin_url", default_pulsar_admin_url,
)
self.tenant = params.get("tenant", default_tenant)
logger.info(
f"Pulsar bootstrap: admin={self.pulsar_admin_url} "
f"tenant={self.tenant}"
)
# ------------------------------------------------------------------
# Admin API helpers — synchronous requests, wrapped so the loop can
# treat them like any other awaitable. Running them on the default
# executor keeps the event loop responsive.
# ------------------------------------------------------------------
def _get_clusters(self):
resp = requests.get(
f"{self.pulsar_admin_url}/admin/v2/clusters",
timeout=REQUEST_TIMEOUT,
)
resp.raise_for_status()
return resp.json()
def _tenant_exists(self):
resp = requests.get(
f"{self.pulsar_admin_url}/admin/v2/tenants/{self.tenant}",
timeout=REQUEST_TIMEOUT,
)
return resp.status_code == 200
def _create_tenant(self, clusters):
resp = requests.put(
f"{self.pulsar_admin_url}/admin/v2/tenants/{self.tenant}",
json={
"adminRoles": [],
"allowedClusters": clusters,
},
timeout=REQUEST_TIMEOUT,
)
if resp.status_code != 204:
raise RuntimeError(
f"Tenant {self.tenant} create failed: "
f"{resp.status_code} {resp.text}"
)
def _namespace_exists(self, namespace):
resp = requests.get(
f"{self.pulsar_admin_url}/admin/v2/namespaces/"
f"{self.tenant}/{namespace}",
timeout=REQUEST_TIMEOUT,
)
return resp.status_code == 200
def _create_namespace(self, namespace, config):
resp = requests.put(
f"{self.pulsar_admin_url}/admin/v2/namespaces/"
f"{self.tenant}/{namespace}",
json=config,
timeout=REQUEST_TIMEOUT,
)
if resp.status_code != 204:
raise RuntimeError(
f"Namespace {self.tenant}/{namespace} create failed: "
f"{resp.status_code} {resp.text}"
)
# ------------------------------------------------------------------
# Reconcile step — offload blocking HTTP to the executor so we
# don't stall the event loop.
# ------------------------------------------------------------------
def _reconcile_sync(self):
"""Do the actual admin-API work on a thread."""
if not self._tenant_exists():
clusters = self._get_clusters()
logger.info(
f"Creating tenant {self.tenant} with clusters {clusters}"
)
self._create_tenant(clusters)
else:
logger.debug(f"Tenant {self.tenant} already exists.")
for namespace, config in NAMESPACE_CONFIG.items():
if self._namespace_exists(namespace):
logger.debug(
f"Namespace {self.tenant}/{namespace} already exists."
)
continue
logger.info(
f"Creating namespace {self.tenant}/{namespace}"
)
self._create_namespace(namespace, config)
async def _reconcile_once(self):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._reconcile_sync)
async def run(self):
logger.info("Pulsar bootstrap loop starting.")
while self.running:
try:
await self._reconcile_once()
except (requests.ConnectionError, requests.Timeout) as e:
logger.info(
f"Pulsar bootstrap: admin API unreachable "
f"({type(e).__name__}: {e}); retry in "
f"{CONNECT_BACKOFF}s"
)
await asyncio.sleep(CONNECT_BACKOFF)
continue
except Exception as e:
logger.error(
f"Pulsar bootstrap failed: "
f"{type(e).__name__}: {e}",
exc_info=True,
)
# Treat as transient, long sleep to avoid hot loop.
await asyncio.sleep(CYCLE_INTERVAL)
@staticmethod
def add_args(parser: ArgumentParser) -> None:
AsyncProcessor.add_args(parser)
parser.add_argument(
'--pulsar-admin-url',
default=default_pulsar_admin_url,
help=f'Pulsar admin REST URL '
f'(default: {default_pulsar_admin_url})',
)
parser.add_argument(
'-t', '--tenant',
default=default_tenant,
help=f'Pulsar tenant to create/maintain '
f'(default: {default_tenant})',
)
def run():
Processor.launch(default_ident, __doc__)