diff --git a/trustgraph-flow/pyproject.toml b/trustgraph-flow/pyproject.toml index 8ba85adf..fff67c97 100644 --- a/trustgraph-flow/pyproject.toml +++ b/trustgraph-flow/pyproject.toml @@ -61,7 +61,9 @@ api-gateway = "trustgraph.gateway:run" chunker-recursive = "trustgraph.chunking.recursive:run" chunker-token = "trustgraph.chunking.token:run" config-svc = "trustgraph.config.service:run" +config-bootstrap = "trustgraph.bootstrap.config_bootstrap:run" flow-svc = "trustgraph.flow.service:run" +pulsar-bootstrap = "trustgraph.bootstrap.pulsar_bootstrap:run" doc-embeddings-query-milvus = "trustgraph.query.doc_embeddings.milvus:run" doc-embeddings-query-pinecone = "trustgraph.query.doc_embeddings.pinecone:run" doc-embeddings-query-qdrant = "trustgraph.query.doc_embeddings.qdrant:run" diff --git a/trustgraph-flow/trustgraph/bootstrap/__init__.py b/trustgraph-flow/trustgraph/bootstrap/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph-flow/trustgraph/bootstrap/config_bootstrap/__init__.py b/trustgraph-flow/trustgraph/bootstrap/config_bootstrap/__init__.py new file mode 100644 index 00000000..98f4d9da --- /dev/null +++ b/trustgraph-flow/trustgraph/bootstrap/config_bootstrap/__init__.py @@ -0,0 +1 @@ +from . service import * diff --git a/trustgraph-flow/trustgraph/bootstrap/config_bootstrap/__main__.py b/trustgraph-flow/trustgraph/bootstrap/config_bootstrap/__main__.py new file mode 100644 index 00000000..da5a9021 --- /dev/null +++ b/trustgraph-flow/trustgraph/bootstrap/config_bootstrap/__main__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from . service import run + +if __name__ == '__main__': + run() diff --git a/trustgraph-flow/trustgraph/bootstrap/config_bootstrap/service.py b/trustgraph-flow/trustgraph/bootstrap/config_bootstrap/service.py new file mode 100644 index 00000000..bbcb1ca0 --- /dev/null +++ b/trustgraph-flow/trustgraph/bootstrap/config_bootstrap/service.py @@ -0,0 +1,430 @@ +""" +Config bootstrap service. + +Runs a slow reconciliation loop that: + + 1. Seeds the ``__template__`` workspace from an external JSON seed + file on first boot. The seed file has the same shape as + ``tg-init-trustgraph``'s input: a top-level dict of + ``{ config-type: { config-key: value } }`` where values are the + raw (JSON-serialisable) payloads to store. + + 2. On first boot, copies ``__template__`` into a normal workspace + (default: ``default``) so a single-tenant system has a working + workspace without manual provisioning. + + 3. Optionally starts a default flow in that workspace using a + specified blueprint. + +Each of these three steps is gated on a durable marker stored in the +reserved ``__system__`` workspace. Markers are set on successful +completion of their own step; once set they are never touched again, +so the loop becomes a cheap no-op after first boot. Markers are also +independent: a partial failure (e.g. default workspace seeded but +flow-start failed) is resumable on next wake. + +Reserved workspaces ``__template__`` and ``__system__`` start with an +underscore and so are excluded from config push notifications — no +live service sees their contents. + +Cadence: + - CYCLE_INTERVAL (300s) on success / steady state + - CONNECT_BACKOFF (10s) on connection failure to config-svc +""" + +import argparse +import asyncio +import json +import logging +import uuid +from argparse import ArgumentParser + +from trustgraph.base import AsyncProcessor +from trustgraph.base import ProducerMetrics, SubscriberMetrics +from trustgraph.base.config_client import ConfigClient +from trustgraph.schema import ( + ConfigRequest, ConfigResponse, + config_request_queue, config_response_queue, +) +from trustgraph.schema import ( + FlowRequest, FlowResponse, + flow_request_queue, flow_response_queue, +) +from trustgraph.base.request_response_spec import RequestResponse + +logger = logging.getLogger(__name__) + +default_ident = "config-bootstrap" + +# Reserved workspaces +TEMPLATE_WORKSPACE = "__template__" +SYSTEM_WORKSPACE = "__system__" + +# System marker type + keys +BOOTSTRAP_MARKER_TYPE = "bootstrap" +TEMPLATE_SEEDED_KEY = "template-seeded" +DEFAULT_WORKSPACE_SEEDED_KEY = "default-workspace-seeded" +DEFAULT_FLOW_STARTED_KEY = "default-flow-started" + +# Loop cadence +CYCLE_INTERVAL = 300 +CONNECT_BACKOFF = 10 + +default_default_workspace = "default" +default_default_flow_id = "default" +default_default_flow_description = "Default" + + +class Processor(AsyncProcessor): + + def __init__(self, **params): + + super().__init__(**params) + + self.config_file = params.get("config_file") + + self.default_workspace = params.get( + "default_workspace", default_default_workspace, + ) + self.default_flow_id = params.get( + "default_flow_id", default_default_flow_id, + ) + self.default_flow_blueprint = params.get( + "default_flow_blueprint", + ) + self.default_flow_description = params.get( + "default_flow_description", default_default_flow_description, + ) + + # Optional flow parameters as JSON string + default_flow_parameters_raw = params.get("default_flow_parameters") + if default_flow_parameters_raw: + try: + self.default_flow_parameters = json.loads( + default_flow_parameters_raw + ) + except Exception as e: + raise RuntimeError( + f"Invalid --default-flow-parameters JSON: {e}" + ) + else: + self.default_flow_parameters = {} + + if self.config_file is None: + raise RuntimeError( + "config_file is required (--config-file)" + ) + + logger.info( + f"Config bootstrap: seed file={self.config_file} " + f"default_workspace={self.default_workspace} " + f"default_flow_id={self.default_flow_id} " + f"default_flow_blueprint={self.default_flow_blueprint or '(none)'}" + ) + + # ------------------------------------------------------------------ + # Helpers for talking to config-svc and flow-svc via req/resp. + # ------------------------------------------------------------------ + + def _make_config_client(self): + """Short-lived ConfigClient over the shared pubsub backend.""" + + rr_id = str(uuid.uuid4()) + + req_metrics = ProducerMetrics( + processor=self.id, flow=None, name="config-request", + ) + resp_metrics = SubscriberMetrics( + processor=self.id, flow=None, name="config-response", + ) + + return ConfigClient( + backend=self.pubsub_backend, + subscription=f"{self.id}--config--{rr_id}", + consumer_name=self.id, + request_topic=config_request_queue, + request_schema=ConfigRequest, + request_metrics=req_metrics, + response_topic=config_response_queue, + response_schema=ConfigResponse, + response_metrics=resp_metrics, + ) + + def _make_flow_client(self): + """Short-lived RequestResponse client for flow-svc.""" + + rr_id = str(uuid.uuid4()) + + req_metrics = ProducerMetrics( + processor=self.id, flow=None, name="flow-request", + ) + resp_metrics = SubscriberMetrics( + processor=self.id, flow=None, name="flow-response", + ) + + return RequestResponse( + backend=self.pubsub_backend, + subscription=f"{self.id}--flow--{rr_id}", + consumer_name=self.id, + request_topic=flow_request_queue, + request_schema=FlowRequest, + request_metrics=req_metrics, + response_topic=flow_response_queue, + response_schema=FlowResponse, + response_metrics=resp_metrics, + ) + + # ------------------------------------------------------------------ + # Marker helpers. Markers live in __system__/bootstrap/. + # ------------------------------------------------------------------ + + async def _marker_is_set(self, config, key): + value = await config.get( + SYSTEM_WORKSPACE, BOOTSTRAP_MARKER_TYPE, key, + ) + return value is not None + + async def _marker_set(self, config, key): + await config.put( + SYSTEM_WORKSPACE, BOOTSTRAP_MARKER_TYPE, key, + json.dumps(True), + ) + + # ------------------------------------------------------------------ + # Seed data. + # ------------------------------------------------------------------ + + def _load_seed(self): + """Read the seed JSON file from disk. Returns + ``{type: {key: raw-value}}``.""" + with open(self.config_file) as f: + return json.load(f) + + async def _write_config_tree(self, config, workspace, tree): + """Write every (type, key, value) pair from a seed tree into + the given workspace as a single batched put.""" + values = [] + for type_name, entries in tree.items(): + for key, value in entries.items(): + values.append( + (type_name, key, json.dumps(value)) + ) + if values: + await config.put_many(workspace, values) + + # ------------------------------------------------------------------ + # Reconciliation steps. + # ------------------------------------------------------------------ + + async def _seed_template(self, config): + """Step 1: populate __template__ from the seed file if not + already seeded.""" + + if await self._marker_is_set(config, TEMPLATE_SEEDED_KEY): + logger.debug("Template already seeded.") + return False + + logger.info( + f"Seeding __template__ workspace from {self.config_file}" + ) + + seed = self._load_seed() + + await self._write_config_tree(config, TEMPLATE_WORKSPACE, seed) + + await self._marker_set(config, TEMPLATE_SEEDED_KEY) + + logger.info("Template seeded.") + return True + + async def _bootstrap_default_workspace(self, config): + """Step 2: copy __template__ content into the default + workspace on first boot.""" + + if await self._marker_is_set( + config, DEFAULT_WORKSPACE_SEEDED_KEY, + ): + logger.debug("Default workspace already bootstrapped.") + return False + + logger.info( + f"Bootstrapping default workspace '{self.default_workspace}' " + f"from __template__" + ) + + # Re-read the seed file so the default workspace is populated + # from the same source of truth as __template__. We do not + # copy from __template__ via the config service because + # __template__ may have been edited after seeding; the seed + # file is the factory default. + seed = self._load_seed() + + await self._write_config_tree( + config, self.default_workspace, seed, + ) + + await self._marker_set(config, DEFAULT_WORKSPACE_SEEDED_KEY) + + logger.info( + f"Default workspace '{self.default_workspace}' bootstrapped." + ) + return True + + async def _start_default_flow(self, config): + """Step 3: start a default flow if a blueprint was supplied + and the flow has not already been started.""" + + if self.default_flow_blueprint is None: + logger.debug( + "No default-flow-blueprint configured, skipping." + ) + return False + + if await self._marker_is_set( + config, DEFAULT_FLOW_STARTED_KEY, + ): + logger.debug("Default flow already started.") + return False + + logger.info( + f"Starting default flow " + f"'{self.default_flow_id}' " + f"(blueprint '{self.default_flow_blueprint}') " + f"in workspace '{self.default_workspace}'" + ) + + flow_client = self._make_flow_client() + try: + await flow_client.start() + + req = FlowRequest( + operation="start-flow", + workspace=self.default_workspace, + flow_id=self.default_flow_id, + blueprint_name=self.default_flow_blueprint, + description=self.default_flow_description, + parameters=self.default_flow_parameters, + ) + + resp = await flow_client.request(req, timeout=30) + + if resp.error: + raise RuntimeError( + f"start-flow failed: " + f"{resp.error.type}: {resp.error.message}" + ) + + finally: + try: + await flow_client.stop() + except Exception: + pass + + await self._marker_set(config, DEFAULT_FLOW_STARTED_KEY) + + logger.info("Default flow started.") + return True + + async def _reconcile_once(self): + """Run all three steps in order; each is a cheap no-op once + its marker is set.""" + + config = self._make_config_client() + try: + await config.start() + await self._seed_template(config) + await self._bootstrap_default_workspace(config) + await self._start_default_flow(config) + finally: + try: + await config.stop() + except Exception: + pass + + # ------------------------------------------------------------------ + # Main loop — the processor-group / standalone entry point. + # ------------------------------------------------------------------ + + async def run(self): + + logger.info("Config bootstrap loop starting.") + + while self.running: + + try: + await self._reconcile_once() + + except (ConnectionError, TimeoutError, OSError) as e: + logger.info( + f"Bootstrap: connect/timeout error " + f"({type(e).__name__}: {e}); retry in " + f"{CONNECT_BACKOFF}s" + ) + await asyncio.sleep(CONNECT_BACKOFF) + continue + + except Exception as e: + logger.error( + f"Bootstrap reconcile failed: " + f"{type(e).__name__}: {e}", + exc_info=True, + ) + # Treat as transient, fall through to long sleep so + # we don't spin on persistent failures. + + await asyncio.sleep(CYCLE_INTERVAL) + + # ------------------------------------------------------------------ + # CLI arg plumbing. + # ------------------------------------------------------------------ + + @staticmethod + def add_args(parser: ArgumentParser) -> None: + + AsyncProcessor.add_args(parser) + + parser.add_argument( + '-C', '--config-file', + required=True, + help='Path to the seed JSON config file ' + '({type: {key: value}} shape)', + ) + + parser.add_argument( + '-w', '--default-workspace', + default=default_default_workspace, + help=f'Name of the default workspace to bootstrap ' + f'(default: {default_default_workspace})', + ) + + parser.add_argument( + '--default-flow-id', + default=default_default_flow_id, + help=f'Flow id to start in the default workspace ' + f'(default: {default_default_flow_id})', + ) + + parser.add_argument( + '--default-flow-blueprint', + default=None, + help='Blueprint to use for the default flow. ' + 'If not set, no default flow is started.', + ) + + parser.add_argument( + '--default-flow-description', + default=default_default_flow_description, + help=f'Description for the default flow ' + f'(default: "{default_default_flow_description}")', + ) + + parser.add_argument( + '--default-flow-parameters', + default=None, + help='Optional JSON string of parameters to pass to ' + 'start-flow (e.g. \'{"llm-model":"..."}\').', + ) + + +def run(): + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/bootstrap/pulsar_bootstrap/__init__.py b/trustgraph-flow/trustgraph/bootstrap/pulsar_bootstrap/__init__.py new file mode 100644 index 00000000..98f4d9da --- /dev/null +++ b/trustgraph-flow/trustgraph/bootstrap/pulsar_bootstrap/__init__.py @@ -0,0 +1 @@ +from . service import * diff --git a/trustgraph-flow/trustgraph/bootstrap/pulsar_bootstrap/__main__.py b/trustgraph-flow/trustgraph/bootstrap/pulsar_bootstrap/__main__.py new file mode 100644 index 00000000..da5a9021 --- /dev/null +++ b/trustgraph-flow/trustgraph/bootstrap/pulsar_bootstrap/__main__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from . service import run + +if __name__ == '__main__': + run() diff --git a/trustgraph-flow/trustgraph/bootstrap/pulsar_bootstrap/service.py b/trustgraph-flow/trustgraph/bootstrap/pulsar_bootstrap/service.py new file mode 100644 index 00000000..fcbbb18d --- /dev/null +++ b/trustgraph-flow/trustgraph/bootstrap/pulsar_bootstrap/service.py @@ -0,0 +1,224 @@ +""" +Pulsar topology bootstrap service. + +Runs a slow reconciliation loop that ensures the Pulsar tenant and +namespaces TrustGraph requires are present, creating them if they +aren't. All operations are idempotent, so the loop is a cheap no-op +once the cluster has been initialised. + +Only relevant when running on the Pulsar pub/sub backend — for +RabbitMQ / Kafka this service can simply be omitted from the +processor group. + +Namespaces created: + - ``/flow`` — data-path topics (durable, no retention limit) + - ``/request`` — request-class topics + - ``/response`` — response-class topics (short retention) + - ``/notify`` — notify topics (very short retention) + +This is a thin reconciler around the same admin-API calls +``tg-init-trustgraph`` has always made; what's new is that it lives +in a processor group, keeps retrying on failure, and doesn't need a +dedicated container. + +Cadence: + - CYCLE_INTERVAL (600s) on success / steady state + - CONNECT_BACKOFF (10s) on admin-API failure +""" + +import asyncio +import logging +from argparse import ArgumentParser + +import requests + +from trustgraph.base import AsyncProcessor + +logger = logging.getLogger(__name__) + +default_ident = "pulsar-bootstrap" +default_pulsar_admin_url = "http://pulsar:8080" +default_tenant = "tg" + +# Namespace configs. flow/request take broker defaults; response and +# notify have aggressive retention to keep the broker lean (short-lived +# reply and notification traffic only). +NAMESPACE_CONFIG = { + "flow": {}, + "request": {}, + "response": { + "retention_policies": { + "retentionSizeInMB": -1, + "retentionTimeInMinutes": 3, + "subscriptionExpirationTimeMinutes": 30, + }, + }, + "notify": { + "retention_policies": { + "retentionSizeInMB": -1, + "retentionTimeInMinutes": 3, + "subscriptionExpirationTimeMinutes": 5, + }, + }, +} + +CYCLE_INTERVAL = 600 +CONNECT_BACKOFF = 10 +REQUEST_TIMEOUT = 10 + + +class Processor(AsyncProcessor): + + def __init__(self, **params): + + super().__init__(**params) + + self.pulsar_admin_url = params.get( + "pulsar_admin_url", default_pulsar_admin_url, + ) + self.tenant = params.get("tenant", default_tenant) + + logger.info( + f"Pulsar bootstrap: admin={self.pulsar_admin_url} " + f"tenant={self.tenant}" + ) + + # ------------------------------------------------------------------ + # Admin API helpers — synchronous requests, wrapped so the loop can + # treat them like any other awaitable. Running them on the default + # executor keeps the event loop responsive. + # ------------------------------------------------------------------ + + def _get_clusters(self): + resp = requests.get( + f"{self.pulsar_admin_url}/admin/v2/clusters", + timeout=REQUEST_TIMEOUT, + ) + resp.raise_for_status() + return resp.json() + + def _tenant_exists(self): + resp = requests.get( + f"{self.pulsar_admin_url}/admin/v2/tenants/{self.tenant}", + timeout=REQUEST_TIMEOUT, + ) + return resp.status_code == 200 + + def _create_tenant(self, clusters): + resp = requests.put( + f"{self.pulsar_admin_url}/admin/v2/tenants/{self.tenant}", + json={ + "adminRoles": [], + "allowedClusters": clusters, + }, + timeout=REQUEST_TIMEOUT, + ) + if resp.status_code != 204: + raise RuntimeError( + f"Tenant {self.tenant} create failed: " + f"{resp.status_code} {resp.text}" + ) + + def _namespace_exists(self, namespace): + resp = requests.get( + f"{self.pulsar_admin_url}/admin/v2/namespaces/" + f"{self.tenant}/{namespace}", + timeout=REQUEST_TIMEOUT, + ) + return resp.status_code == 200 + + def _create_namespace(self, namespace, config): + resp = requests.put( + f"{self.pulsar_admin_url}/admin/v2/namespaces/" + f"{self.tenant}/{namespace}", + json=config, + timeout=REQUEST_TIMEOUT, + ) + if resp.status_code != 204: + raise RuntimeError( + f"Namespace {self.tenant}/{namespace} create failed: " + f"{resp.status_code} {resp.text}" + ) + + # ------------------------------------------------------------------ + # Reconcile step — offload blocking HTTP to the executor so we + # don't stall the event loop. + # ------------------------------------------------------------------ + + def _reconcile_sync(self): + """Do the actual admin-API work on a thread.""" + + if not self._tenant_exists(): + clusters = self._get_clusters() + logger.info( + f"Creating tenant {self.tenant} with clusters {clusters}" + ) + self._create_tenant(clusters) + else: + logger.debug(f"Tenant {self.tenant} already exists.") + + for namespace, config in NAMESPACE_CONFIG.items(): + if self._namespace_exists(namespace): + logger.debug( + f"Namespace {self.tenant}/{namespace} already exists." + ) + continue + logger.info( + f"Creating namespace {self.tenant}/{namespace}" + ) + self._create_namespace(namespace, config) + + async def _reconcile_once(self): + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self._reconcile_sync) + + async def run(self): + + logger.info("Pulsar bootstrap loop starting.") + + while self.running: + + try: + await self._reconcile_once() + + except (requests.ConnectionError, requests.Timeout) as e: + logger.info( + f"Pulsar bootstrap: admin API unreachable " + f"({type(e).__name__}: {e}); retry in " + f"{CONNECT_BACKOFF}s" + ) + await asyncio.sleep(CONNECT_BACKOFF) + continue + + except Exception as e: + logger.error( + f"Pulsar bootstrap failed: " + f"{type(e).__name__}: {e}", + exc_info=True, + ) + # Treat as transient, long sleep to avoid hot loop. + + await asyncio.sleep(CYCLE_INTERVAL) + + @staticmethod + def add_args(parser: ArgumentParser) -> None: + + AsyncProcessor.add_args(parser) + + parser.add_argument( + '--pulsar-admin-url', + default=default_pulsar_admin_url, + help=f'Pulsar admin REST URL ' + f'(default: {default_pulsar_admin_url})', + ) + + parser.add_argument( + '-t', '--tenant', + default=default_tenant, + help=f'Pulsar tenant to create/maintain ' + f'(default: {default_tenant})', + ) + + +def run(): + Processor.launch(default_ident, __doc__)