diff --git a/docs/tech-specs/bootstrap.md b/docs/tech-specs/bootstrap.md new file mode 100644 index 00000000..af7387d1 --- /dev/null +++ b/docs/tech-specs/bootstrap.md @@ -0,0 +1,297 @@ +--- +layout: default +title: "Bootstrap Framework Technical Specification" +parent: "Tech Specs" +--- + +# Bootstrap Framework Technical Specification + +## Overview + +A generic, pluggable framework for running one-time initialisation steps +against a TrustGraph deployment — replacing the dedicated +`tg-init-trustgraph` container with a long-running processor that +converges the system to a desired initial state and then idles. + +The framework is content-agnostic. It knows how to run, retry, +mark-as-done, and surface failures; the actual init work lives in +small pluggable classes called **initialisers**. Core initialisers +ship in the `trustgraph-flow` package; enterprise and third-party +initialisers can be loaded by dotted path without any core code +change. + +## Motivation + +The existing `tg-init-trustgraph` is a one-shot CLI run in its own +container. It performs two very different jobs (Pulsar topology +setup and config seeding) in a single script, is wasteful as a whole +container, cannot handle partial-success states, and has no way to +extend the boot process with enterprise-specific concerns (user +provisioning, workspace initialisation, IAM scaffolding) without +forking the tool. + +A pluggable, long-running reconciler addresses all of this and slots +naturally into the existing processor-group model. + +## Design + +### Bootstrapper Processor + +A single `AsyncProcessor` subclass. One entry in a processor group. +Parameters include the processor's own identity and a list of +**initialiser specifications** — each spec names a class (by dotted +path), a unique instance name, a flag string, and the parameters +that will be passed to the initialiser's constructor. + +On each wake the bootstrapper does the following, in order: + +1. Open a short-lived context (config client, flow-svc client, + logger). The context is torn down at the end of the wake so + steady-state idle cost is effectively nil. +2. Run all **pre-service initialisers** (those that opt out of the + service gate — principally `PulsarTopology`, which must run + before the services it gates on can even come up). +3. Check the **service gate**: cheap round-trips to config-svc and + flow-svc. If either fails, skip to the sleep step using the + short gate-retry cadence. +4. Run all **post-service initialisers** that haven't already + completed at the currently-configured flag. +5. Sleep. Cadence adapts to state (see below). + +### Initialiser Contract + +An initialiser is a class with: + +- A class-level `name` identifier, unique within the bootstrapper's + configuration. This is the key under which completion state is + stored. +- A class-level `wait_for_services` flag. When `True` (the default) + the initialiser runs only after the service gate passes. When + `False`, it runs before the gate, on every wake. +- A constructor that accepts the initialiser's own params as kwargs. +- An async `run(ctx, old_flag, new_flag)` method that performs the + init work and returns on success. Any raised exception is + logged and treated as a transient failure — the stored flag is + not updated and the initialiser will re-run on the next cycle. + +`old_flag` is the previously-stored flag string, or `None` if the +initialiser has never successfully run in this deployment. `new_flag` +is the flag the operator has configured for this run. This pair +lets an initialiser distinguish a clean first-run from a migration +between flag versions and behave accordingly (see "Flag change and +re-run safety" below). + +### Context + +The context is the bootstrapper-owned object passed to every +initialiser's `run()` method. Its fields are deliberately narrow: + +| Field | Purpose | +|---|---| +| `logger` | A child logger named for the initialiser instance | +| `config` | A short-lived `ConfigClient` for config-svc reads/writes | +| `flow` | A short-lived `RequestResponse` client for flow-svc | + +The context is always fully-populated regardless of which services +a given initialiser uses, for symmetry. Additional fields may be +added in future without breaking existing initialisers. Clients are +started at the beginning of a wake cycle and stopped at the end. + +Initialisers that need services beyond config-svc and flow-svc are +responsible for their own readiness checks and for raising cleanly +when a prerequisite is not met. + +### Completion State + +Per-initialiser completion state is stored in the reserved +`__system__` workspace, under a dedicated config type for bootstrap +state. The stored value is the flag string that was configured when +the initialiser last succeeded. + +On each cycle, for each initialiser, the bootstrapper reads the +stored flag and compares it to the currently-configured flag. If +they match, the initialiser is skipped silently. If they differ, +the initialiser runs; on success, the stored flag is updated. + +Because the state lives in a reserved (`_`-prefixed) workspace, it +is stored by config-svc but excluded from the config push broadcast. +Live processors never see it and cannot act on it. + +### The Service Gate + +The gate is a cheap, bootstrapper-internal check that config-svc +and flow-svc are both reachable and responsive. It is intentionally +a simple pair of low-cost round-trips — a config list against +`__system__` and a flow-svc `list-blueprints` — rather than any +deeper health check. + +Its purpose is to avoid filling logs with noise and to concentrate +retry effort during the brief window when services are coming up. +The gate is applied only to initialisers with +`wait_for_services=True` (the default); `False` is reserved for +initialisers that set up infrastructure the gate itself depends on. + +### Adaptive Cadence + +The sleep between wake cycles is chosen from three tiers based on +observed state: + +| Tier | Duration | When | +|---|---|---| +| Gate backoff | ~5 s | Services not responding — concentrate retry during startup | +| Init retry | ~15 s | Gate passes but at least one initialiser is not yet at its configured flag — transient failures, waiting on prereqs, recently-bumped flag not yet applied | +| Steady | ~300 s | All configured initialisers at their configured flag; gate passes; nothing to do | + +The short tiers ensure a fresh deployment converges quickly; +steady state costs a single round-trip per initialiser every few +minutes. + +### Failure Handling + +An initialiser raising an exception does not stop the bootstrapper +or block other initialisers. Each initialiser in the cycle is +attempted independently; failures are logged and retried on the next +cycle. This means there is no ordered-DAG enforcement: order of +initialisers in the configuration determines the attempt order +within a cycle, but a dependency between two initialisers is +expressed by the dependant raising cleanly when its prerequisite +isn't satisfied. Over successive cycles the system converges. + +### Flag Change and Re-run Safety + +Each initialiser's completion state is a string flag chosen by the +operator. Typically these follow a simple version pattern +(`v1`, `v2`, ...), but the bootstrapper imposes no format. + +Changing the flag in the group configuration causes the +corresponding initialiser to re-run on the next cycle. Initialisers +must be written so that re-running after a flag bump is safe — they +receive both the previous and the new flag and are responsible for +either cleanly re-applying the work or performing a step-change +migration from the prior state. + +This gives operators an explicit, visible mechanism for triggering +re-initialisation. Re-runs are never implicit. + +## Core Initialisers + +The following initialisers ship in `trustgraph.bootstrap.initialisers` +and cover the base deployment case. + +### PulsarTopology + +Creates the Pulsar tenant and the four namespaces +(`flow`, `request`, `response`, `notify`) with appropriate +retention policies if they don't exist. + +Opts out of the service gate (`wait_for_services = False`) because +config-svc and flow-svc cannot come online until the Pulsar +namespaces exist. + +Parameters: Pulsar admin URL, tenant name. + +Idempotent via the admin API (GET-then-PUT). Flag change causes +re-evaluation of all namespaces; any absent are created. + +### TemplateSeed + +Populates the reserved `__template__` workspace from an external +JSON seed file. The seed file has the standard shape of +`{config-type: {config-key: value}}`. + +Runs post-gate. Parameters: path to the seed file, overwrite +policy (upsert-missing only, or overwrite-all). + +On clean run, writes the whole file. On flag change, behaviour +depends on the overwrite policy — typically upsert-missing so +that operator-customised keys are preserved across seed-file +upgrades. + +### WorkspaceInit + +Creates a named workspace and populates it from the seed file or +from the full contents of the `__template__` workspace. + +Runs post-gate. Parameters: workspace name, source (seed file or +`__template__`), optional `seed_file` path, `overwrite` flag. + +When `source` is `template`, the initialiser copies every config +type and key present in `__template__` — there is no per-type +selection. Deployments that want to seed only a subset should +either curate the seed file they feed to `TemplateSeed` or use +`source: seed-file` directly here. + +Raises cleanly if its source does not exist — depends on +`TemplateSeed` having run in the same cycle or a prior one. + +### DefaultFlowStart + +Starts a specific flow in a specific workspace using a specific +blueprint. + +Runs post-gate. Parameters: workspace name, flow id, blueprint +name, description, optional parameter overrides. + +Separated from `WorkspaceInit` deliberately so that deployments +which want a workspace without an auto-started flow can simply omit +this initialiser from their bootstrap configuration. + +## Extensibility + +New initialisers are added by: + +1. Subclassing the initialiser base class. +2. Implementing `run(ctx, old_flag, new_flag)`. +3. Choosing `wait_for_services` (almost always `True`). +4. Adding an entry in the bootstrapper's configuration with the new + class's dotted path. + +No core code changes are required to add an enterprise or third-party +initialiser. Enterprise builds ship their own package with their own +initialiser classes (e.g. `CreateAdminUser`, `ProvisionWorkspaces`) +and reference them in the bootstrapper config alongside the core +initialisers. + +## Reserved Workspaces + +This specification relies on the "reserved workspace" convention: + +- Any workspace id beginning with `_` is reserved. +- Reserved workspaces are stored normally by config-svc but never + appear in the config push broadcast. +- Live processors cannot react to reserved-workspace state. + +The bootstrapper uses two reserved workspaces: + +- `__template__` — factory-default seed config, readable by + initialisers that copy-from-template. +- `__system__` — bootstrapper completion state (under the + `init-state` config type) and any other system-internal bookkeeping. + +See the reserved-workspace convention in the config service for +the general rule and its enforcement. + +## Non-Goals + +- No DAG scheduling across initialisers. Dependencies are expressed + by the dependant failing cleanly until its prerequisite is met, + and convergence over subsequent cycles. +- No parallel execution of initialisers within a cycle. A cycle runs + each initialiser sequentially. +- No implicit re-runs. Re-running an initialiser requires an explicit + flag change by the operator. +- No cross-initialiser atomicity. Each initialiser's completion is + recorded independently on its own success. + +## Operational Notes + +- Running the bootstrapper as a processor-group entry replaces the + previous `tg-init-trustgraph` container. The bootstrapper is also + CLI-invocable directly for standalone testing via + `Processor.launch(...)`. +- First-boot convergence is typically a handful of short cycles + followed by a transition to the steady cadence. Deployments + should expect the first few minutes of logs to show + initialisation activity, thereafter effective silence. +- Bumping a flag is a deliberate operational act. The log line + emitted on re-run makes the event visible for audit. diff --git a/docs/tech-specs/iam.md b/docs/tech-specs/iam.md index 5de50749..cb1399fe 100644 --- a/docs/tech-specs/iam.md +++ b/docs/tech-specs/iam.md @@ -848,7 +848,6 @@ service, not in the config service. Reasons: - **API key scoping.** API keys could be scoped to specific collections within a workspace rather than granting workspace-wide access. To be designed when the need arises. -- **tg-init-trustgraph** only initialises a single workspace. ## References diff --git a/trustgraph-base/trustgraph/base/config_client.py b/trustgraph-base/trustgraph/base/config_client.py index 504a6d58..eb3892f8 100644 --- a/trustgraph-base/trustgraph/base/config_client.py +++ b/trustgraph-base/trustgraph/base/config_client.py @@ -84,6 +84,18 @@ class ConfigClient(RequestResponse): ) return resp.directory + async def get_all(self, workspace, timeout=CONFIG_TIMEOUT): + """Return every config entry in ``workspace`` as a nested dict + ``{type: {key: value}}``. Values are returned as the raw + strings stored by config-svc (typically JSON); callers parse + as needed. An empty dict means the workspace has no config.""" + resp = await self._request( + operation="config", + workspace=workspace, + timeout=timeout, + ) + return resp.config + async def workspaces_for_type(self, type, timeout=CONFIG_TIMEOUT): """Return the set of distinct workspaces with any config of the given type.""" diff --git a/trustgraph-cli/pyproject.toml b/trustgraph-cli/pyproject.toml index a5738449..d316ae4f 100644 --- a/trustgraph-cli/pyproject.toml +++ b/trustgraph-cli/pyproject.toml @@ -40,7 +40,6 @@ tg-get-flow-blueprint = "trustgraph.cli.get_flow_blueprint:main" tg-get-kg-core = "trustgraph.cli.get_kg_core:main" tg-get-document-content = "trustgraph.cli.get_document_content:main" tg-graph-to-turtle = "trustgraph.cli.graph_to_turtle:main" -tg-init-trustgraph = "trustgraph.cli.init_trustgraph:main" tg-invoke-agent = "trustgraph.cli.invoke_agent:main" tg-invoke-document-rag = "trustgraph.cli.invoke_document_rag:main" tg-invoke-graph-rag = "trustgraph.cli.invoke_graph_rag:main" diff --git a/trustgraph-cli/trustgraph/cli/init_trustgraph.py b/trustgraph-cli/trustgraph/cli/init_trustgraph.py deleted file mode 100644 index d984f925..00000000 --- a/trustgraph-cli/trustgraph/cli/init_trustgraph.py +++ /dev/null @@ -1,271 +0,0 @@ -""" -Initialises TrustGraph pub/sub infrastructure and pushes initial config. - -For Pulsar: creates tenant, namespaces, and retention policies. -For RabbitMQ: queues are auto-declared, so only config push is needed. -""" - -import requests -import time -import argparse -import json - -from trustgraph.clients.config_client import ConfigClient -from trustgraph.base.pubsub import add_pubsub_args - -default_pulsar_admin_url = "http://pulsar:8080" -subscriber = "tg-init-pubsub" - - -def get_clusters(url): - - print("Get clusters...", flush=True) - - resp = requests.get(f"{url}/admin/v2/clusters") - - if resp.status_code != 200: raise RuntimeError("Could not fetch clusters") - - return resp.json() - -def ensure_tenant(url, tenant, clusters): - - resp = requests.get(f"{url}/admin/v2/tenants/{tenant}") - - if resp.status_code == 200: - print(f"Tenant {tenant} already exists.", flush=True) - return - - resp = requests.put( - f"{url}/admin/v2/tenants/{tenant}", - json={ - "adminRoles": [], - "allowedClusters": clusters, - } - ) - - if resp.status_code != 204: - print(resp.text, flush=True) - raise RuntimeError("Tenant creation failed.") - - print(f"Tenant {tenant} created.", flush=True) - -def ensure_namespace(url, tenant, namespace, config): - - resp = requests.get(f"{url}/admin/v2/namespaces/{tenant}/{namespace}") - - if resp.status_code == 200: - print(f"Namespace {tenant}/{namespace} already exists.", flush=True) - return - - resp = requests.put( - f"{url}/admin/v2/namespaces/{tenant}/{namespace}", - json=config, - ) - - if resp.status_code != 204: - print(resp.status_code, flush=True) - print(resp.text, flush=True) - raise RuntimeError(f"Namespace {tenant}/{namespace} creation failed.") - - print(f"Namespace {tenant}/{namespace} created.", flush=True) - -def ensure_config(config, workspace="default", **pubsub_config): - - cli = ConfigClient( - subscriber=subscriber, - workspace=workspace, - **pubsub_config, - ) - - while True: - - try: - - print("Get current config...", flush=True) - current, version = cli.config(timeout=5) - - except Exception as e: - - print("Exception:", e, flush=True) - time.sleep(2) - print("Retrying...", flush=True) - continue - - print("Current config version is", version, flush=True) - - if version != 0: - print("Already updated, not updating config. Done.", flush=True) - return - - print("Config is version 0, updating...", flush=True) - - batch = [] - - for type in config: - for key in config[type]: - print(f"Adding {type}/{key} to update.", flush=True) - batch.append({ - "type": type, - "key": key, - "value": json.dumps(config[type][key]), - }) - - try: - cli.put(batch, timeout=10) - print("Update succeeded.", flush=True) - break - except Exception as e: - print("Exception:", e, flush=True) - time.sleep(2) - print("Retrying...", flush=True) - continue - -def init_pulsar(pulsar_admin_url, tenant): - """Pulsar-specific setup: create tenant, namespaces, retention policies.""" - - clusters = get_clusters(pulsar_admin_url) - - ensure_tenant(pulsar_admin_url, tenant, clusters) - - ensure_namespace(pulsar_admin_url, tenant, "flow", {}) - - ensure_namespace(pulsar_admin_url, tenant, "request", {}) - - ensure_namespace(pulsar_admin_url, tenant, "response", { - "retention_policies": { - "retentionSizeInMB": -1, - "retentionTimeInMinutes": 3, - "subscriptionExpirationTimeMinutes": 30, - } - }) - - ensure_namespace(pulsar_admin_url, tenant, "notify", { - "retention_policies": { - "retentionSizeInMB": -1, - "retentionTimeInMinutes": 3, - "subscriptionExpirationTimeMinutes": 5, - } - }) - - -def push_config(config_json, config_file, workspace="default", - **pubsub_config): - """Push initial config if provided.""" - - if config_json is not None: - - try: - print("Decoding config...", flush=True) - dec = json.loads(config_json) - print("Decoded.", flush=True) - except Exception as e: - print("Exception:", e, flush=True) - raise e - - ensure_config(dec, workspace=workspace, **pubsub_config) - - elif config_file is not None: - - try: - print("Decoding config...", flush=True) - dec = json.load(open(config_file)) - print("Decoded.", flush=True) - except Exception as e: - print("Exception:", e, flush=True) - raise e - - ensure_config(dec, workspace=workspace, **pubsub_config) - - else: - print("No config to update.", flush=True) - - -def main(): - - parser = argparse.ArgumentParser( - prog='tg-init-trustgraph', - description=__doc__, - ) - - parser.add_argument( - '--pulsar-admin-url', - default=default_pulsar_admin_url, - help=f'Pulsar admin URL (default: {default_pulsar_admin_url})', - ) - - parser.add_argument( - '-c', '--config', - help=f'Initial configuration to load', - ) - - parser.add_argument( - '-C', '--config-file', - help=f'Initial configuration to load from file', - ) - - parser.add_argument( - '-t', '--tenant', - default="tg", - help=f'Tenant (default: tg)', - ) - - parser.add_argument( - '-w', '--workspace', - default="default", - help=f'Workspace (default: default)', - ) - - add_pubsub_args(parser) - - args = parser.parse_args() - - backend_type = args.pubsub_backend - - # Extract pubsub config from args - pubsub_config = { - k: v for k, v in vars(args).items() - if k not in ( - 'pulsar_admin_url', 'config', 'config_file', 'tenant', - 'workspace', - ) - } - - while True: - - try: - - # Pulsar-specific setup (tenants, namespaces) - if backend_type == 'pulsar': - print(flush=True) - print( - f"Initialising Pulsar at {args.pulsar_admin_url}...", - flush=True, - ) - init_pulsar(args.pulsar_admin_url, args.tenant) - else: - print(flush=True) - print( - f"Using {backend_type} backend (no admin setup needed).", - flush=True, - ) - - # Push config (works with any backend) - push_config( - args.config, args.config_file, - workspace=args.workspace, - **pubsub_config, - ) - - print("Initialisation complete.", flush=True) - break - - except Exception as e: - - print("Exception:", e, flush=True) - - print("Sleeping...", flush=True) - time.sleep(2) - print("Will retry...", flush=True) - -if __name__ == "__main__": - main() diff --git a/trustgraph-flow/pyproject.toml b/trustgraph-flow/pyproject.toml index 8ba85adf..cc7dac63 100644 --- a/trustgraph-flow/pyproject.toml +++ b/trustgraph-flow/pyproject.toml @@ -60,6 +60,7 @@ agent-orchestrator = "trustgraph.agent.orchestrator:run" api-gateway = "trustgraph.gateway:run" chunker-recursive = "trustgraph.chunking.recursive:run" chunker-token = "trustgraph.chunking.token:run" +bootstrap = "trustgraph.bootstrap.bootstrapper:run" config-svc = "trustgraph.config.service:run" flow-svc = "trustgraph.flow.service:run" doc-embeddings-query-milvus = "trustgraph.query.doc_embeddings.milvus:run" diff --git a/trustgraph-flow/trustgraph/bootstrap/__init__.py b/trustgraph-flow/trustgraph/bootstrap/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph-flow/trustgraph/bootstrap/base.py b/trustgraph-flow/trustgraph/bootstrap/base.py new file mode 100644 index 00000000..cb022a16 --- /dev/null +++ b/trustgraph-flow/trustgraph/bootstrap/base.py @@ -0,0 +1,68 @@ +""" +Bootstrap framework: Initialiser base class and per-wake context. + +See docs/tech-specs/bootstrap.md for the full design. +""" + +import logging +from dataclasses import dataclass +from typing import Any + + +@dataclass +class InitContext: + """Shared per-wake context passed to each initialiser. + + The bootstrapper constructs one of these on every wake cycle, + tears it down at cycle end, and passes it into each initialiser's + ``run()`` method. Fields are short-lived and safe to use during + a single cycle only. + """ + + logger: logging.Logger + config: Any # ConfigClient + flow: Any # RequestResponse client for flow-svc + + +class Initialiser: + """Base class for bootstrap initialisers. + + Subclasses implement :meth:`run`. The bootstrapper manages + completion state, flag comparison, retry and error handling — + subclasses describe only the work to perform. + + Class attributes: + + * ``wait_for_services`` (bool, default ``True``): when ``True`` the + initialiser only runs after the bootstrapper's service gate has + passed (config-svc and flow-svc reachable). Set ``False`` for + initialisers that bring up infrastructure the gate itself + depends on — principally Pulsar topology, without which + config-svc cannot come online. + """ + + wait_for_services: bool = True + + def __init__(self, **params): + # Subclasses should consume their own params via keyword + # arguments in their own __init__ signatures. This catch-all + # is here so any kwargs that filter through unnoticed don't + # raise TypeError on construction. + pass + + async def run(self, ctx, old_flag, new_flag): + """Perform initialisation work. + + :param ctx: :class:`InitContext` with logger, config client, + flow-svc client. + :param old_flag: Previously-stored flag string, or ``None`` if + this initialiser has never successfully completed in this + deployment. + :param new_flag: Currently-configured flag. A string chosen + by the operator; typically something like ``"v1"``. + + :raises: Any exception on failure. The bootstrapper catches, + logs, and re-runs on the next cycle; completion state is + only written on clean return. + """ + raise NotImplementedError diff --git a/trustgraph-flow/trustgraph/bootstrap/bootstrapper/__init__.py b/trustgraph-flow/trustgraph/bootstrap/bootstrapper/__init__.py new file mode 100644 index 00000000..98f4d9da --- /dev/null +++ b/trustgraph-flow/trustgraph/bootstrap/bootstrapper/__init__.py @@ -0,0 +1 @@ +from . service import * diff --git a/trustgraph-flow/trustgraph/bootstrap/bootstrapper/__main__.py b/trustgraph-flow/trustgraph/bootstrap/bootstrapper/__main__.py new file mode 100644 index 00000000..da5a9021 --- /dev/null +++ b/trustgraph-flow/trustgraph/bootstrap/bootstrapper/__main__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from . service import run + +if __name__ == '__main__': + run() diff --git a/trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py b/trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py new file mode 100644 index 00000000..eb6238d3 --- /dev/null +++ b/trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py @@ -0,0 +1,414 @@ +""" +Bootstrapper processor. + +Runs a pluggable list of initialisers in a reconciliation loop. +Each initialiser's completion state is recorded in the reserved +``__system__`` workspace under the ``init-state`` config type. + +See docs/tech-specs/bootstrap.md for the full design. +""" + +import asyncio +import importlib +import json +import logging +import uuid +from argparse import ArgumentParser +from dataclasses import dataclass + +from trustgraph.base import AsyncProcessor +from trustgraph.base import ProducerMetrics, SubscriberMetrics +from trustgraph.base.config_client import ConfigClient +from trustgraph.base.request_response_spec import RequestResponse +from trustgraph.schema import ( + ConfigRequest, ConfigResponse, + config_request_queue, config_response_queue, +) +from trustgraph.schema import ( + FlowRequest, FlowResponse, + flow_request_queue, flow_response_queue, +) + +from .. base import Initialiser, InitContext + +logger = logging.getLogger(__name__) + +default_ident = "bootstrap" + +# Reserved workspace + config type under which completion state is +# stored. Reserved (`_`-prefix) workspaces are excluded from the +# config push broadcast — live processors never see these keys. +SYSTEM_WORKSPACE = "__system__" +INIT_STATE_TYPE = "init-state" + +# Cadence tiers. +GATE_BACKOFF = 5 # Services not responding; retry soon. +INIT_RETRY = 15 # Gate passed but something ran/failed; + # converge quickly. +STEADY_INTERVAL = 300 # Everything at target flag; idle cheaply. + + +@dataclass +class InitialiserSpec: + """One entry in the bootstrapper's configured list of initialisers.""" + name: str + flag: str + instance: Initialiser + + +def _resolve_class(dotted): + """Import and return a class by its dotted path.""" + module_path, _, class_name = dotted.rpartition(".") + if not module_path: + raise ValueError( + f"Initialiser class must be a dotted path, got {dotted!r}" + ) + module = importlib.import_module(module_path) + return getattr(module, class_name) + + +def _load_initialisers_file(path): + """Load the initialisers spec list from a YAML or JSON file. + + File shape: + + .. code-block:: yaml + + initialisers: + - class: trustgraph.bootstrap.initialisers.PulsarTopology + name: pulsar-topology + flag: v1 + params: + admin_url: http://pulsar:8080 + tenant: tg + - ... + """ + with open(path) as f: + content = f.read() + if path.endswith((".yaml", ".yml")): + import yaml + doc = yaml.safe_load(content) + else: + doc = json.loads(content) + if not isinstance(doc, dict) or "initialisers" not in doc: + raise RuntimeError( + f"{path}: expected a mapping with an 'initialisers' key" + ) + return doc["initialisers"] + + +class Processor(AsyncProcessor): + + def __init__(self, **params): + + super().__init__(**params) + + # Source the initialisers list either from a direct parameter + # (processor-group embedding) or from a file (CLI launch). + inits = params.get("initialisers") + if inits is None: + inits_file = params.get("initialisers_file") + if inits_file is None: + raise RuntimeError( + "Bootstrapper requires either the 'initialisers' " + "parameter or --initialisers-file" + ) + inits = _load_initialisers_file(inits_file) + + self.specs = [] + names = set() + + for entry in inits: + if not isinstance(entry, dict): + raise RuntimeError( + f"Initialiser entry must be a mapping, got: {entry!r}" + ) + for required in ("class", "name", "flag"): + if required not in entry: + raise RuntimeError( + f"Initialiser entry missing required field " + f"{required!r}: {entry!r}" + ) + + name = entry["name"] + if name in names: + raise RuntimeError(f"Duplicate initialiser name {name!r}") + names.add(name) + + cls = _resolve_class(entry["class"]) + + try: + instance = cls(**entry.get("params", {})) + except Exception as e: + raise RuntimeError( + f"Failed to instantiate initialiser " + f"{entry['class']!r} as {name!r}: " + f"{type(e).__name__}: {e}" + ) + + self.specs.append(InitialiserSpec( + name=name, + flag=entry["flag"], + instance=instance, + )) + + logger.info( + f"Bootstrapper: loaded {len(self.specs)} initialisers" + ) + + # ------------------------------------------------------------------ + # Client construction (short-lived per wake cycle). + # ------------------------------------------------------------------ + + def _make_config_client(self): + rr_id = str(uuid.uuid4()) + return ConfigClient( + backend=self.pubsub_backend, + subscription=f"{self.id}--config--{rr_id}", + consumer_name=self.id, + request_topic=config_request_queue, + request_schema=ConfigRequest, + request_metrics=ProducerMetrics( + processor=self.id, flow=None, name="config-request", + ), + response_topic=config_response_queue, + response_schema=ConfigResponse, + response_metrics=SubscriberMetrics( + processor=self.id, flow=None, name="config-response", + ), + ) + + def _make_flow_client(self): + rr_id = str(uuid.uuid4()) + return RequestResponse( + backend=self.pubsub_backend, + subscription=f"{self.id}--flow--{rr_id}", + consumer_name=self.id, + request_topic=flow_request_queue, + request_schema=FlowRequest, + request_metrics=ProducerMetrics( + processor=self.id, flow=None, name="flow-request", + ), + response_topic=flow_response_queue, + response_schema=FlowResponse, + response_metrics=SubscriberMetrics( + processor=self.id, flow=None, name="flow-response", + ), + ) + + async def _open_clients(self): + config = self._make_config_client() + flow = self._make_flow_client() + await config.start() + try: + await flow.start() + except Exception: + await self._safe_stop(config) + raise + return config, flow + + async def _safe_stop(self, client): + try: + await client.stop() + except Exception: + pass + + # ------------------------------------------------------------------ + # Service gate. + # ------------------------------------------------------------------ + + async def _gate_ready(self, config, flow): + try: + await config.keys(SYSTEM_WORKSPACE, INIT_STATE_TYPE) + except Exception as e: + logger.info( + f"Gate: config-svc not ready ({type(e).__name__}: {e})" + ) + return False + + try: + resp = await flow.request( + FlowRequest( + operation="list-blueprints", + workspace=SYSTEM_WORKSPACE, + ), + timeout=5, + ) + if resp.error: + logger.info( + f"Gate: flow-svc error: " + f"{resp.error.type}: {resp.error.message}" + ) + return False + except Exception as e: + logger.info( + f"Gate: flow-svc not ready ({type(e).__name__}: {e})" + ) + return False + + return True + + # ------------------------------------------------------------------ + # Completion state. + # ------------------------------------------------------------------ + + async def _stored_flag(self, config, name): + raw = await config.get(SYSTEM_WORKSPACE, INIT_STATE_TYPE, name) + if raw is None: + return None + try: + return json.loads(raw) + except Exception: + return raw + + async def _store_flag(self, config, name, flag): + await config.put( + SYSTEM_WORKSPACE, INIT_STATE_TYPE, name, + json.dumps(flag), + ) + + # ------------------------------------------------------------------ + # Per-spec execution. + # ------------------------------------------------------------------ + + async def _run_spec(self, spec, config, flow): + """Run a single initialiser spec. + + Returns one of: + - ``"skip"``: stored flag already matches target, nothing to do. + - ``"ran"``: initialiser ran and completion state was updated. + - ``"failed"``: initialiser raised. + - ``"failed-state-write"``: initialiser succeeded but we could + not persist the new flag (transient — will re-run next cycle). + """ + + try: + old_flag = await self._stored_flag(config, spec.name) + except Exception as e: + logger.warning( + f"{spec.name}: could not read stored flag " + f"({type(e).__name__}: {e})" + ) + return "failed" + + if old_flag == spec.flag: + return "skip" + + child_logger = logger.getChild(spec.name) + child_ctx = InitContext( + logger=child_logger, + config=config, + flow=flow, + ) + + child_logger.info( + f"Running (old_flag={old_flag!r} -> new_flag={spec.flag!r})" + ) + + try: + await spec.instance.run(child_ctx, old_flag, spec.flag) + except Exception as e: + child_logger.error( + f"Failed: {type(e).__name__}: {e}", exc_info=True, + ) + return "failed" + + try: + await self._store_flag(config, spec.name, spec.flag) + except Exception as e: + child_logger.warning( + f"Completed but could not persist state flag " + f"({type(e).__name__}: {e}); will re-run next cycle" + ) + return "failed-state-write" + + child_logger.info(f"Completed (flag={spec.flag!r})") + return "ran" + + # ------------------------------------------------------------------ + # Main loop. + # ------------------------------------------------------------------ + + async def run(self): + + logger.info( + f"Bootstrapper starting with {len(self.specs)} initialisers" + ) + + while self.running: + + sleep_for = STEADY_INTERVAL + + try: + config, flow = await self._open_clients() + except Exception as e: + logger.info( + f"Failed to open clients " + f"({type(e).__name__}: {e}); retry in {GATE_BACKOFF}s" + ) + await asyncio.sleep(GATE_BACKOFF) + continue + + try: + # Phase 1: pre-service initialisers run unconditionally. + pre_specs = [ + s for s in self.specs + if not s.instance.wait_for_services + ] + pre_results = {} + for spec in pre_specs: + pre_results[spec.name] = await self._run_spec( + spec, config, flow, + ) + + # Phase 2: gate. + gate_ok = await self._gate_ready(config, flow) + + # Phase 3: post-service initialisers, if gate passed. + post_results = {} + if gate_ok: + post_specs = [ + s for s in self.specs + if s.instance.wait_for_services + ] + for spec in post_specs: + post_results[spec.name] = await self._run_spec( + spec, config, flow, + ) + + # Cadence selection. + if not gate_ok: + sleep_for = GATE_BACKOFF + else: + all_results = {**pre_results, **post_results} + if any(r != "skip" for r in all_results.values()): + sleep_for = INIT_RETRY + else: + sleep_for = STEADY_INTERVAL + + finally: + await self._safe_stop(config) + await self._safe_stop(flow) + + await asyncio.sleep(sleep_for) + + # ------------------------------------------------------------------ + # CLI arg plumbing. + # ------------------------------------------------------------------ + + @staticmethod + def add_args(parser: ArgumentParser) -> None: + + AsyncProcessor.add_args(parser) + + parser.add_argument( + '-c', '--initialisers-file', + help='Path to YAML or JSON file describing the ' + 'initialisers to run. Ignored when the ' + "'initialisers' parameter is provided directly " + '(e.g. when running inside a processor group).', + ) + + +def run(): + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/bootstrap/initialisers/__init__.py b/trustgraph-flow/trustgraph/bootstrap/initialisers/__init__.py new file mode 100644 index 00000000..6171eb02 --- /dev/null +++ b/trustgraph-flow/trustgraph/bootstrap/initialisers/__init__.py @@ -0,0 +1,20 @@ +""" +Core bootstrap initialisers. + +These cover the base TrustGraph deployment case. Enterprise or +third-party initialisers live in their own packages and are +referenced in the bootstrapper's config by fully-qualified dotted +path. +""" + +from . pulsar_topology import PulsarTopology +from . template_seed import TemplateSeed +from . workspace_init import WorkspaceInit +from . default_flow_start import DefaultFlowStart + +__all__ = [ + "PulsarTopology", + "TemplateSeed", + "WorkspaceInit", + "DefaultFlowStart", +] diff --git a/trustgraph-flow/trustgraph/bootstrap/initialisers/default_flow_start.py b/trustgraph-flow/trustgraph/bootstrap/initialisers/default_flow_start.py new file mode 100644 index 00000000..7e7f96bd --- /dev/null +++ b/trustgraph-flow/trustgraph/bootstrap/initialisers/default_flow_start.py @@ -0,0 +1,101 @@ +""" +DefaultFlowStart initialiser — starts a named flow in a workspace +using a specified blueprint. + +Separated from WorkspaceInit so deployments that want a workspace +without an auto-started flow can simply omit this initialiser. + +Parameters +---------- +workspace : str (default "default") + Workspace in which to start the flow. +flow_id : str (default "default") + Identifier for the started flow. +blueprint : str (required) + Blueprint name (must already exist in the workspace's config, + typically via TemplateSeed -> WorkspaceInit). +description : str (default "Default") + Human-readable description passed to flow-svc. +parameters : dict (optional) + Optional parameter overrides passed to start-flow. +""" + +from trustgraph.schema import FlowRequest + +from .. base import Initialiser + + +class DefaultFlowStart(Initialiser): + + def __init__( + self, + workspace="default", + flow_id="default", + blueprint=None, + description="Default", + parameters=None, + **kwargs, + ): + super().__init__(**kwargs) + if not blueprint: + raise ValueError( + "DefaultFlowStart requires 'blueprint'" + ) + self.workspace = workspace + self.flow_id = flow_id + self.blueprint = blueprint + self.description = description + self.parameters = dict(parameters) if parameters else {} + + async def run(self, ctx, old_flag, new_flag): + + # Check whether the flow already exists. Belt-and-braces + # beyond the flag gate: if an operator stops and restarts the + # bootstrapper after the flow is already running, we don't + # want to blindly try to start it again. + list_resp = await ctx.flow.request( + FlowRequest( + operation="list-flows", + workspace=self.workspace, + ), + timeout=10, + ) + if list_resp.error: + raise RuntimeError( + f"list-flows failed: " + f"{list_resp.error.type}: {list_resp.error.message}" + ) + + if self.flow_id in (list_resp.flow_ids or []): + ctx.logger.info( + f"Flow {self.flow_id!r} already running in workspace " + f"{self.workspace!r}; nothing to do" + ) + return + + ctx.logger.info( + f"Starting flow {self.flow_id!r} " + f"(blueprint={self.blueprint!r}) " + f"in workspace {self.workspace!r}" + ) + + resp = await ctx.flow.request( + FlowRequest( + operation="start-flow", + workspace=self.workspace, + flow_id=self.flow_id, + blueprint_name=self.blueprint, + description=self.description, + parameters=self.parameters, + ), + timeout=30, + ) + if resp.error: + raise RuntimeError( + f"start-flow failed: " + f"{resp.error.type}: {resp.error.message}" + ) + + ctx.logger.info( + f"Flow {self.flow_id!r} started" + ) diff --git a/trustgraph-flow/trustgraph/bootstrap/initialisers/pulsar_topology.py b/trustgraph-flow/trustgraph/bootstrap/initialisers/pulsar_topology.py new file mode 100644 index 00000000..843fe056 --- /dev/null +++ b/trustgraph-flow/trustgraph/bootstrap/initialisers/pulsar_topology.py @@ -0,0 +1,131 @@ +""" +PulsarTopology initialiser — creates Pulsar tenant and namespaces +with their retention policies. + +Runs pre-gate (``wait_for_services = False``) because config-svc and +flow-svc can't connect to Pulsar until these namespaces exist. +Admin-API calls are idempotent so re-runs on flag change are safe. +""" + +import asyncio +import requests + +from .. base import Initialiser + +# Namespace configs. flow/request take broker defaults. response +# and notify get aggressive retention — those classes carry short-lived +# request/response and notification traffic only. +NAMESPACE_CONFIG = { + "flow": {}, + "request": {}, + "response": { + "retention_policies": { + "retentionSizeInMB": -1, + "retentionTimeInMinutes": 3, + "subscriptionExpirationTimeMinutes": 30, + }, + }, + "notify": { + "retention_policies": { + "retentionSizeInMB": -1, + "retentionTimeInMinutes": 3, + "subscriptionExpirationTimeMinutes": 5, + }, + }, +} + +REQUEST_TIMEOUT = 10 + + +class PulsarTopology(Initialiser): + + wait_for_services = False + + def __init__( + self, + admin_url="http://pulsar:8080", + tenant="tg", + **kwargs, + ): + super().__init__(**kwargs) + self.admin_url = admin_url.rstrip("/") + self.tenant = tenant + + async def run(self, ctx, old_flag, new_flag): + # requests is blocking; offload to executor so the loop stays + # responsive. + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self._reconcile_sync, ctx.logger) + + # ------------------------------------------------------------------ + # Sync admin-API calls. + # ------------------------------------------------------------------ + + def _get_clusters(self): + resp = requests.get( + f"{self.admin_url}/admin/v2/clusters", + timeout=REQUEST_TIMEOUT, + ) + resp.raise_for_status() + return resp.json() + + def _tenant_exists(self): + resp = requests.get( + f"{self.admin_url}/admin/v2/tenants/{self.tenant}", + timeout=REQUEST_TIMEOUT, + ) + return resp.status_code == 200 + + def _create_tenant(self, clusters): + resp = requests.put( + f"{self.admin_url}/admin/v2/tenants/{self.tenant}", + json={"adminRoles": [], "allowedClusters": clusters}, + timeout=REQUEST_TIMEOUT, + ) + if resp.status_code != 204: + raise RuntimeError( + f"Tenant {self.tenant!r} create failed: " + f"{resp.status_code} {resp.text}" + ) + + def _namespace_exists(self, namespace): + resp = requests.get( + f"{self.admin_url}/admin/v2/namespaces/" + f"{self.tenant}/{namespace}", + timeout=REQUEST_TIMEOUT, + ) + return resp.status_code == 200 + + def _create_namespace(self, namespace, config): + resp = requests.put( + f"{self.admin_url}/admin/v2/namespaces/" + f"{self.tenant}/{namespace}", + json=config, + timeout=REQUEST_TIMEOUT, + ) + if resp.status_code != 204: + raise RuntimeError( + f"Namespace {self.tenant}/{namespace} create failed: " + f"{resp.status_code} {resp.text}" + ) + + def _reconcile_sync(self, logger): + if not self._tenant_exists(): + clusters = self._get_clusters() + logger.info( + f"Creating tenant {self.tenant!r} with clusters {clusters}" + ) + self._create_tenant(clusters) + else: + logger.debug(f"Tenant {self.tenant!r} already exists") + + for namespace, config in NAMESPACE_CONFIG.items(): + if self._namespace_exists(namespace): + logger.debug( + f"Namespace {self.tenant}/{namespace} already exists" + ) + continue + logger.info( + f"Creating namespace {self.tenant}/{namespace}" + ) + self._create_namespace(namespace, config) diff --git a/trustgraph-flow/trustgraph/bootstrap/initialisers/template_seed.py b/trustgraph-flow/trustgraph/bootstrap/initialisers/template_seed.py new file mode 100644 index 00000000..5f1e4c19 --- /dev/null +++ b/trustgraph-flow/trustgraph/bootstrap/initialisers/template_seed.py @@ -0,0 +1,93 @@ +""" +TemplateSeed initialiser — populates the reserved ``__template__`` +workspace from an external JSON seed file. + +Seed file shape: + +.. code-block:: json + + { + "flow-blueprint": { + "ontology": { ... }, + "agent": { ... } + }, + "prompt": { + ... + }, + ... + } + +Top-level keys are config types; nested keys are config entries. +Values are arbitrary JSON (they'll be ``json.dumps()``'d on write). + +Parameters +---------- +config_file : str + Path to the seed file on disk. +overwrite : bool (default False) + On re-run (flag change), if True overwrite all keys; if False + upsert-missing-only (preserves any operator customisation of + the template). +""" + +import json + +from .. base import Initialiser + +TEMPLATE_WORKSPACE = "__template__" + + +class TemplateSeed(Initialiser): + + def __init__(self, config_file, overwrite=False, **kwargs): + super().__init__(**kwargs) + if not config_file: + raise ValueError("TemplateSeed requires 'config_file'") + self.config_file = config_file + self.overwrite = overwrite + + async def run(self, ctx, old_flag, new_flag): + + with open(self.config_file) as f: + seed = json.load(f) + + if old_flag is None: + # Clean first run — write every entry. + await self._write_all(ctx, seed) + return + + # Re-run after flag change. + if self.overwrite: + await self._write_all(ctx, seed) + else: + await self._upsert_missing(ctx, seed) + + async def _write_all(self, ctx, seed): + values = [] + for type_name, entries in seed.items(): + for key, value in entries.items(): + values.append((type_name, key, json.dumps(value))) + if values: + await ctx.config.put_many(TEMPLATE_WORKSPACE, values) + ctx.logger.info( + f"Template seeded with {len(values)} entries" + ) + + async def _upsert_missing(self, ctx, seed): + written = 0 + for type_name, entries in seed.items(): + existing = set( + await ctx.config.keys(TEMPLATE_WORKSPACE, type_name) + ) + values = [] + for key, value in entries.items(): + if key not in existing: + values.append( + (type_name, key, json.dumps(value)) + ) + if values: + await ctx.config.put_many(TEMPLATE_WORKSPACE, values) + written += len(values) + ctx.logger.info( + f"Template upsert-missing: {written} new entries" + ) diff --git a/trustgraph-flow/trustgraph/bootstrap/initialisers/workspace_init.py b/trustgraph-flow/trustgraph/bootstrap/initialisers/workspace_init.py new file mode 100644 index 00000000..10aefe9d --- /dev/null +++ b/trustgraph-flow/trustgraph/bootstrap/initialisers/workspace_init.py @@ -0,0 +1,138 @@ +""" +WorkspaceInit initialiser — creates a workspace and populates it from +either the ``__template__`` workspace or a seed file on disk. + +Parameters +---------- +workspace : str + Target workspace to create / populate. +source : str + Either ``"template"`` (copy the full contents of the + ``__template__`` workspace) or ``"seed-file"`` (read from + ``seed_file``). +seed_file : str (required when source=="seed-file") + Path to a JSON seed file with the same shape TemplateSeed consumes. +overwrite : bool (default False) + On re-run (flag change), if True overwrite all keys; if False, + upsert-missing-only (preserves in-workspace customisations). + +Raises (in ``run``) +------------------- +When source is ``"template"``, raises ``RuntimeError`` if the +``__template__`` workspace is empty — indicating that TemplateSeed +hasn't run yet. The bootstrapper's retry loop will re-attempt on +the next cycle once the prerequisite is satisfied. +""" + +import json + +from .. base import Initialiser + +TEMPLATE_WORKSPACE = "__template__" + + +class WorkspaceInit(Initialiser): + + def __init__( + self, + workspace="default", + source="template", + seed_file=None, + overwrite=False, + **kwargs, + ): + super().__init__(**kwargs) + + if source not in ("template", "seed-file"): + raise ValueError( + f"WorkspaceInit: source must be 'template' or " + f"'seed-file', got {source!r}" + ) + if source == "seed-file" and not seed_file: + raise ValueError( + "WorkspaceInit: seed_file required when source='seed-file'" + ) + + self.workspace = workspace + self.source = source + self.seed_file = seed_file + self.overwrite = overwrite + + async def run(self, ctx, old_flag, new_flag): + if self.source == "seed-file": + tree = self._load_seed_file() + else: + tree = await self._load_from_template(ctx) + + if old_flag is None or self.overwrite: + await self._write_all(ctx, tree) + else: + await self._upsert_missing(ctx, tree) + + def _load_seed_file(self): + with open(self.seed_file) as f: + return json.load(f) + + async def _load_from_template(self, ctx): + """Build a seed tree from the entire ``__template__`` workspace. + Raises if the workspace is empty, so the bootstrapper knows + the prerequisite isn't met yet.""" + + raw_tree = await ctx.config.get_all(TEMPLATE_WORKSPACE) + + tree = {} + total = 0 + for type_name, entries in raw_tree.items(): + parsed = {} + for key, raw in entries.items(): + if raw is None: + continue + try: + parsed[key] = json.loads(raw) + except Exception: + parsed[key] = raw + total += 1 + if parsed: + tree[type_name] = parsed + + if total == 0: + raise RuntimeError( + "Template workspace is empty — has TemplateSeed run yet?" + ) + + ctx.logger.debug( + f"Loaded {total} template entries across {len(tree)} types" + ) + return tree + + async def _write_all(self, ctx, tree): + values = [] + for type_name, entries in tree.items(): + for key, value in entries.items(): + values.append((type_name, key, json.dumps(value))) + if values: + await ctx.config.put_many(self.workspace, values) + ctx.logger.info( + f"Workspace {self.workspace!r} populated with " + f"{len(values)} entries" + ) + + async def _upsert_missing(self, ctx, tree): + written = 0 + for type_name, entries in tree.items(): + existing = set( + await ctx.config.keys(self.workspace, type_name) + ) + values = [] + for key, value in entries.items(): + if key not in existing: + values.append( + (type_name, key, json.dumps(value)) + ) + if values: + await ctx.config.put_many(self.workspace, values) + written += len(values) + ctx.logger.info( + f"Workspace {self.workspace!r} upsert-missing: " + f"{written} new entries" + ) diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index 56a54ee0..058f4e4b 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -24,6 +24,21 @@ logger = logging.getLogger(__name__) default_ident = "config-svc" + +def is_reserved_workspace(workspace): + """Reserved workspaces are storage-only. + + Any workspace id beginning with ``_`` is reserved for internal use + (e.g. ``__template__`` holding factory-default seed config). + Reads and writes work normally so bootstrap and provisioning code + can use the standard config API, but **change notifications for + reserved workspaces are suppressed**. Services subscribed to the + config push therefore never see reserved-workspace events and + cannot accidentally act on template content as if it were live + state. + """ + return workspace.startswith("_") + default_config_request_queue = config_request_queue default_config_response_queue = config_response_queue default_config_push_queue = config_push_queue @@ -130,6 +145,21 @@ class Processor(AsyncProcessor): async def push(self, changes=None): + # Suppress notifications from reserved workspaces (ids starting + # with "_", e.g. "__template__"). Stored config is preserved; + # only the broadcast is filtered. Keeps services oblivious to + # template / bootstrap state. + if changes: + filtered = {} + for type_name, workspaces in changes.items(): + visible = [ + w for w in workspaces + if not is_reserved_workspace(w) + ] + if visible: + filtered[type_name] = visible + changes = filtered + version = await self.config.get_version() resp = ConfigPush(