mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-06-13 08:45:13 +02:00
feat: pluggable bootstrap framework with ordered initialisers
Replaces the previous pair of single-purpose bootstrap processors
(config_bootstrap + pulsar_bootstrap) with a generic Bootstrapper
that runs a configurable list of pluggable "initialisers" in a
reconciliation loop.
Design
------
See docs/tech-specs/bootstrap.md for the full spec.
The bootstrapper is a single AsyncProcessor that:
* Reads a list of initialiser specifications (class, name, flag,
params) from either a direct `initialisers` parameter
(processor-group embedding) or a YAML/JSON file (`-c`, CLI).
* On each wake, runs a cheap service-gate (config-svc + flow-svc
round-trips), then iterates the initialiser list, running each
whose configured flag differs from the one stored in
__system__/init-state/<name>.
* Stores per-initialiser completion state in the reserved
__system__ workspace; services never see these keys because of
the _-prefix notification filter.
* Adapts cadence: ~5s on gate failure, ~15s while converging,
~300s in steady state.
* Isolates failures — one initialiser's exception does not block
others in the same cycle; the failed one retries next wake.
Initialiser contract
--------------------
* Subclass trustgraph.bootstrap.base.Initialiser.
* Implement async run(ctx, old_flag, new_flag).
* Opt out of the service gate with class attr
wait_for_services=False (only used by PulsarTopology — config-svc
cannot come up until Pulsar namespaces exist).
* ctx carries short-lived config and flow-svc clients plus a
scoped logger.
Core initialisers (trustgraph.bootstrap.initialisers.*)
-------------------------------------------------------
* PulsarTopology — creates Pulsar tenant + namespaces
(pre-gate, blocking HTTP offloaded to
executor).
* TemplateSeed — seeds __template__ from an external JSON
file; re-run is upsert-missing by default,
overwrite-all opt-in.
* WorkspaceInit — populates a named workspace from either
__template__ (with an explicit types list)
or a seed file; raises cleanly if the
template isn't seeded yet (retries on
next cycle).
* DefaultFlowStart — starts a specific flow in a workspace;
no-ops if the flow is already running.
Enterprise or third-party initialisers plug in via fully-qualified
dotted class paths in the bootstrapper's configuration — no core
code change required.
pyproject.toml
--------------
* Replaces config-bootstrap + pulsar-bootstrap entries with a
single `bootstrap` script pointing at the new Processor.
This commit is contained in:
parent
42226729bc
commit
1fc0d8f192
15 changed files with 1272 additions and 663 deletions
290
docs/tech-specs/bootstrap.md
Normal file
290
docs/tech-specs/bootstrap.md
Normal file
|
|
@ -0,0 +1,290 @@
|
|||
---
|
||||
layout: default
|
||||
title: "Bootstrap Framework Technical Specification"
|
||||
parent: "Tech Specs"
|
||||
---
|
||||
|
||||
# Bootstrap Framework Technical Specification
|
||||
|
||||
## Overview
|
||||
|
||||
A generic, pluggable framework for running one-time initialisation steps
|
||||
against a TrustGraph deployment — replacing the dedicated
|
||||
`tg-init-trustgraph` container with a long-running processor that
|
||||
converges the system to a desired initial state and then idles.
|
||||
|
||||
The framework is content-agnostic. It knows how to run, retry,
|
||||
mark-as-done, and surface failures; the actual init work lives in
|
||||
small pluggable classes called **initialisers**. Core initialisers
|
||||
ship in the `trustgraph-flow` package; enterprise and third-party
|
||||
initialisers can be loaded by dotted path without any core code
|
||||
change.
|
||||
|
||||
## Motivation
|
||||
|
||||
The existing `tg-init-trustgraph` is a one-shot CLI run in its own
|
||||
container. It performs two very different jobs (Pulsar topology
|
||||
setup and config seeding) in a single script, is wasteful as a whole
|
||||
container, cannot handle partial-success states, and has no way to
|
||||
extend the boot process with enterprise-specific concerns (user
|
||||
provisioning, workspace initialisation, IAM scaffolding) without
|
||||
forking the tool.
|
||||
|
||||
A pluggable, long-running reconciler addresses all of this and slots
|
||||
naturally into the existing processor-group model.
|
||||
|
||||
## Design
|
||||
|
||||
### Bootstrapper Processor
|
||||
|
||||
A single `AsyncProcessor` subclass. One entry in a processor group.
|
||||
Parameters include the processor's own identity and a list of
|
||||
**initialiser specifications** — each spec names a class (by dotted
|
||||
path), a unique instance name, a flag string, and the parameters
|
||||
that will be passed to the initialiser's constructor.
|
||||
|
||||
On each wake the bootstrapper does the following, in order:
|
||||
|
||||
1. Open a short-lived context (config client, flow-svc client,
|
||||
logger). The context is torn down at the end of the wake so
|
||||
steady-state idle cost is effectively nil.
|
||||
2. Run all **pre-service initialisers** (those that opt out of the
|
||||
service gate — principally `PulsarTopology`, which must run
|
||||
before the services it gates on can even come up).
|
||||
3. Check the **service gate**: cheap round-trips to config-svc and
|
||||
flow-svc. If either fails, skip to the sleep step using the
|
||||
short gate-retry cadence.
|
||||
4. Run all **post-service initialisers** that haven't already
|
||||
completed at the currently-configured flag.
|
||||
5. Sleep. Cadence adapts to state (see below).
|
||||
|
||||
### Initialiser Contract
|
||||
|
||||
An initialiser is a class with:
|
||||
|
||||
- A class-level `name` identifier, unique within the bootstrapper's
|
||||
configuration. This is the key under which completion state is
|
||||
stored.
|
||||
- A class-level `wait_for_services` flag. When `True` (the default)
|
||||
the initialiser runs only after the service gate passes. When
|
||||
`False`, it runs before the gate, on every wake.
|
||||
- A constructor that accepts the initialiser's own params as kwargs.
|
||||
- An async `run(ctx, old_flag, new_flag)` method that performs the
|
||||
init work and returns on success. Any raised exception is
|
||||
logged and treated as a transient failure — the stored flag is
|
||||
not updated and the initialiser will re-run on the next cycle.
|
||||
|
||||
`old_flag` is the previously-stored flag string, or `None` if the
|
||||
initialiser has never successfully run in this deployment. `new_flag`
|
||||
is the flag the operator has configured for this run. This pair
|
||||
lets an initialiser distinguish a clean first-run from a migration
|
||||
between flag versions and behave accordingly (see "Flag change and
|
||||
re-run safety" below).
|
||||
|
||||
### Context
|
||||
|
||||
The context is the bootstrapper-owned object passed to every
|
||||
initialiser's `run()` method. Its fields are deliberately narrow:
|
||||
|
||||
| Field | Purpose |
|
||||
|---|---|
|
||||
| `logger` | A child logger named for the initialiser instance |
|
||||
| `config` | A short-lived `ConfigClient` for config-svc reads/writes |
|
||||
| `flow` | A short-lived `RequestResponse` client for flow-svc |
|
||||
|
||||
The context is always fully-populated regardless of which services
|
||||
a given initialiser uses, for symmetry. Additional fields may be
|
||||
added in future without breaking existing initialisers. Clients are
|
||||
started at the beginning of a wake cycle and stopped at the end.
|
||||
|
||||
Initialisers that need services beyond config-svc and flow-svc are
|
||||
responsible for their own readiness checks and for raising cleanly
|
||||
when a prerequisite is not met.
|
||||
|
||||
### Completion State
|
||||
|
||||
Per-initialiser completion state is stored in the reserved
|
||||
`__system__` workspace, under a dedicated config type for bootstrap
|
||||
state. The stored value is the flag string that was configured when
|
||||
the initialiser last succeeded.
|
||||
|
||||
On each cycle, for each initialiser, the bootstrapper reads the
|
||||
stored flag and compares it to the currently-configured flag. If
|
||||
they match, the initialiser is skipped silently. If they differ,
|
||||
the initialiser runs; on success, the stored flag is updated.
|
||||
|
||||
Because the state lives in a reserved (`_`-prefixed) workspace, it
|
||||
is stored by config-svc but excluded from the config push broadcast.
|
||||
Live processors never see it and cannot act on it.
|
||||
|
||||
### The Service Gate
|
||||
|
||||
The gate is a cheap, bootstrapper-internal check that config-svc
|
||||
and flow-svc are both reachable and responsive. It is intentionally
|
||||
a simple pair of low-cost round-trips — a config list against
|
||||
`__system__` and a flow-svc `list-blueprints` — rather than any
|
||||
deeper health check.
|
||||
|
||||
Its purpose is to avoid filling logs with noise and to concentrate
|
||||
retry effort during the brief window when services are coming up.
|
||||
The gate is applied only to initialisers with
|
||||
`wait_for_services=True` (the default); `False` is reserved for
|
||||
initialisers that set up infrastructure the gate itself depends on.
|
||||
|
||||
### Adaptive Cadence
|
||||
|
||||
The sleep between wake cycles is chosen from three tiers based on
|
||||
observed state:
|
||||
|
||||
| Tier | Duration | When |
|
||||
|---|---|---|
|
||||
| Gate backoff | ~5 s | Services not responding — concentrate retry during startup |
|
||||
| Init retry | ~15 s | Gate passes but at least one initialiser is not yet at its configured flag — transient failures, waiting on prereqs, recently-bumped flag not yet applied |
|
||||
| Steady | ~300 s | All configured initialisers at their configured flag; gate passes; nothing to do |
|
||||
|
||||
The short tiers ensure a fresh deployment converges quickly;
|
||||
steady state costs a single round-trip per initialiser every few
|
||||
minutes.
|
||||
|
||||
### Failure Handling
|
||||
|
||||
An initialiser raising an exception does not stop the bootstrapper
|
||||
or block other initialisers. Each initialiser in the cycle is
|
||||
attempted independently; failures are logged and retried on the next
|
||||
cycle. This means there is no ordered-DAG enforcement: order of
|
||||
initialisers in the configuration determines the attempt order
|
||||
within a cycle, but a dependency between two initialisers is
|
||||
expressed by the dependant raising cleanly when its prerequisite
|
||||
isn't satisfied. Over successive cycles the system converges.
|
||||
|
||||
### Flag Change and Re-run Safety
|
||||
|
||||
Each initialiser's completion state is a string flag chosen by the
|
||||
operator. Typically these follow a simple version pattern
|
||||
(`v1`, `v2`, ...), but the bootstrapper imposes no format.
|
||||
|
||||
Changing the flag in the group configuration causes the
|
||||
corresponding initialiser to re-run on the next cycle. Initialisers
|
||||
must be written so that re-running after a flag bump is safe — they
|
||||
receive both the previous and the new flag and are responsible for
|
||||
either cleanly re-applying the work or performing a step-change
|
||||
migration from the prior state.
|
||||
|
||||
This gives operators an explicit, visible mechanism for triggering
|
||||
re-initialisation. Re-runs are never implicit.
|
||||
|
||||
## Core Initialisers
|
||||
|
||||
The following initialisers ship in `trustgraph.bootstrap.initialisers`
|
||||
and cover the base deployment case.
|
||||
|
||||
### PulsarTopology
|
||||
|
||||
Creates the Pulsar tenant and the four namespaces
|
||||
(`flow`, `request`, `response`, `notify`) with appropriate
|
||||
retention policies if they don't exist.
|
||||
|
||||
Opts out of the service gate (`wait_for_services = False`) because
|
||||
config-svc and flow-svc cannot come online until the Pulsar
|
||||
namespaces exist.
|
||||
|
||||
Parameters: Pulsar admin URL, tenant name.
|
||||
|
||||
Idempotent via the admin API (GET-then-PUT). Flag change causes
|
||||
re-evaluation of all namespaces; any absent are created.
|
||||
|
||||
### TemplateSeed
|
||||
|
||||
Populates the reserved `__template__` workspace from an external
|
||||
JSON seed file. The seed file has the standard shape of
|
||||
`{config-type: {config-key: value}}`.
|
||||
|
||||
Runs post-gate. Parameters: path to the seed file, overwrite
|
||||
policy (upsert-missing only, or overwrite-all).
|
||||
|
||||
On clean run, writes the whole file. On flag change, behaviour
|
||||
depends on the overwrite policy — typically upsert-missing so
|
||||
that operator-customised keys are preserved across seed-file
|
||||
upgrades.
|
||||
|
||||
### WorkspaceInit
|
||||
|
||||
Creates a named workspace and populates it from the seed file (or
|
||||
from the `__template__` workspace).
|
||||
|
||||
Runs post-gate. Parameters: workspace name, source (seed file or
|
||||
`__template__`).
|
||||
|
||||
Raises cleanly if its source does not exist — depends on
|
||||
`TemplateSeed` having run in the same cycle or a prior one.
|
||||
|
||||
### DefaultFlowStart
|
||||
|
||||
Starts a specific flow in a specific workspace using a specific
|
||||
blueprint.
|
||||
|
||||
Runs post-gate. Parameters: workspace name, flow id, blueprint
|
||||
name, description, optional parameter overrides.
|
||||
|
||||
Separated from `WorkspaceInit` deliberately so that deployments
|
||||
which want a workspace without an auto-started flow can simply omit
|
||||
this initialiser from their bootstrap configuration.
|
||||
|
||||
## Extensibility
|
||||
|
||||
New initialisers are added by:
|
||||
|
||||
1. Subclassing the initialiser base class.
|
||||
2. Implementing `run(ctx, old_flag, new_flag)`.
|
||||
3. Choosing `wait_for_services` (almost always `True`).
|
||||
4. Adding an entry in the bootstrapper's configuration with the new
|
||||
class's dotted path.
|
||||
|
||||
No core code changes are required to add an enterprise or third-party
|
||||
initialiser. Enterprise builds ship their own package with their own
|
||||
initialiser classes (e.g. `CreateAdminUser`, `ProvisionWorkspaces`)
|
||||
and reference them in the bootstrapper config alongside the core
|
||||
initialisers.
|
||||
|
||||
## Reserved Workspaces
|
||||
|
||||
This specification relies on the "reserved workspace" convention:
|
||||
|
||||
- Any workspace id beginning with `_` is reserved.
|
||||
- Reserved workspaces are stored normally by config-svc but never
|
||||
appear in the config push broadcast.
|
||||
- Live processors cannot react to reserved-workspace state.
|
||||
|
||||
The bootstrapper uses two reserved workspaces:
|
||||
|
||||
- `__template__` — factory-default seed config, readable by
|
||||
initialisers that copy-from-template.
|
||||
- `__system__` — bootstrapper completion state (under the
|
||||
`init-state` config type) and any other system-internal bookkeeping.
|
||||
|
||||
See the reserved-workspace convention in the config service for
|
||||
the general rule and its enforcement.
|
||||
|
||||
## Non-Goals
|
||||
|
||||
- No DAG scheduling across initialisers. Dependencies are expressed
|
||||
by the dependant failing cleanly until its prerequisite is met,
|
||||
and convergence over subsequent cycles.
|
||||
- No parallel execution of initialisers within a cycle. A cycle runs
|
||||
each initialiser sequentially.
|
||||
- No implicit re-runs. Re-running an initialiser requires an explicit
|
||||
flag change by the operator.
|
||||
- No cross-initialiser atomicity. Each initialiser's completion is
|
||||
recorded independently on its own success.
|
||||
|
||||
## Operational Notes
|
||||
|
||||
- Running the bootstrapper as a processor-group entry replaces the
|
||||
`tg-init-trustgraph` container. The script remains CLI-invocable
|
||||
for standalone testing (`Processor.launch(...)` pattern).
|
||||
- First-boot convergence is typically a handful of short cycles
|
||||
followed by a transition to the steady cadence. Deployments
|
||||
should expect the first few minutes of logs to show
|
||||
initialisation activity, thereafter effective silence.
|
||||
- Bumping a flag is a deliberate operational act. The log line
|
||||
emitted on re-run makes the event visible for audit.
|
||||
|
|
@ -60,10 +60,9 @@ agent-orchestrator = "trustgraph.agent.orchestrator:run"
|
|||
api-gateway = "trustgraph.gateway:run"
|
||||
chunker-recursive = "trustgraph.chunking.recursive:run"
|
||||
chunker-token = "trustgraph.chunking.token:run"
|
||||
bootstrap = "trustgraph.bootstrap.bootstrapper:run"
|
||||
config-svc = "trustgraph.config.service:run"
|
||||
config-bootstrap = "trustgraph.bootstrap.config_bootstrap:run"
|
||||
flow-svc = "trustgraph.flow.service:run"
|
||||
pulsar-bootstrap = "trustgraph.bootstrap.pulsar_bootstrap:run"
|
||||
doc-embeddings-query-milvus = "trustgraph.query.doc_embeddings.milvus:run"
|
||||
doc-embeddings-query-pinecone = "trustgraph.query.doc_embeddings.pinecone:run"
|
||||
doc-embeddings-query-qdrant = "trustgraph.query.doc_embeddings.qdrant:run"
|
||||
|
|
|
|||
68
trustgraph-flow/trustgraph/bootstrap/base.py
Normal file
68
trustgraph-flow/trustgraph/bootstrap/base.py
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
"""
|
||||
Bootstrap framework: Initialiser base class and per-wake context.
|
||||
|
||||
See docs/tech-specs/bootstrap.md for the full design.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class InitContext:
|
||||
"""Shared per-wake context passed to each initialiser.
|
||||
|
||||
The bootstrapper constructs one of these on every wake cycle,
|
||||
tears it down at cycle end, and passes it into each initialiser's
|
||||
``run()`` method. Fields are short-lived and safe to use during
|
||||
a single cycle only.
|
||||
"""
|
||||
|
||||
logger: logging.Logger
|
||||
config: Any # ConfigClient
|
||||
flow: Any # RequestResponse client for flow-svc
|
||||
|
||||
|
||||
class Initialiser:
|
||||
"""Base class for bootstrap initialisers.
|
||||
|
||||
Subclasses implement :meth:`run`. The bootstrapper manages
|
||||
completion state, flag comparison, retry and error handling —
|
||||
subclasses describe only the work to perform.
|
||||
|
||||
Class attributes:
|
||||
|
||||
* ``wait_for_services`` (bool, default ``True``): when ``True`` the
|
||||
initialiser only runs after the bootstrapper's service gate has
|
||||
passed (config-svc and flow-svc reachable). Set ``False`` for
|
||||
initialisers that bring up infrastructure the gate itself
|
||||
depends on — principally Pulsar topology, without which
|
||||
config-svc cannot come online.
|
||||
"""
|
||||
|
||||
wait_for_services: bool = True
|
||||
|
||||
def __init__(self, **params):
|
||||
# Subclasses should consume their own params via keyword
|
||||
# arguments in their own __init__ signatures. This catch-all
|
||||
# is here so any kwargs that filter through unnoticed don't
|
||||
# raise TypeError on construction.
|
||||
pass
|
||||
|
||||
async def run(self, ctx, old_flag, new_flag):
|
||||
"""Perform initialisation work.
|
||||
|
||||
:param ctx: :class:`InitContext` with logger, config client,
|
||||
flow-svc client.
|
||||
:param old_flag: Previously-stored flag string, or ``None`` if
|
||||
this initialiser has never successfully completed in this
|
||||
deployment.
|
||||
:param new_flag: Currently-configured flag. A string chosen
|
||||
by the operator; typically something like ``"v1"``.
|
||||
|
||||
:raises: Any exception on failure. The bootstrapper catches,
|
||||
logs, and re-runs on the next cycle; completion state is
|
||||
only written on clean return.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
414
trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py
Normal file
414
trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py
Normal file
|
|
@ -0,0 +1,414 @@
|
|||
"""
|
||||
Bootstrapper processor.
|
||||
|
||||
Runs a pluggable list of initialisers in a reconciliation loop.
|
||||
Each initialiser's completion state is recorded in the reserved
|
||||
``__system__`` workspace under the ``init-state`` config type.
|
||||
|
||||
See docs/tech-specs/bootstrap.md for the full design.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import importlib
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from argparse import ArgumentParser
|
||||
from dataclasses import dataclass
|
||||
|
||||
from trustgraph.base import AsyncProcessor
|
||||
from trustgraph.base import ProducerMetrics, SubscriberMetrics
|
||||
from trustgraph.base.config_client import ConfigClient
|
||||
from trustgraph.base.request_response_spec import RequestResponse
|
||||
from trustgraph.schema import (
|
||||
ConfigRequest, ConfigResponse,
|
||||
config_request_queue, config_response_queue,
|
||||
)
|
||||
from trustgraph.schema import (
|
||||
FlowRequest, FlowResponse,
|
||||
flow_request_queue, flow_response_queue,
|
||||
)
|
||||
|
||||
from .. base import Initialiser, InitContext
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
default_ident = "bootstrap"
|
||||
|
||||
# Reserved workspace + config type under which completion state is
|
||||
# stored. Reserved (`_`-prefix) workspaces are excluded from the
|
||||
# config push broadcast — live processors never see these keys.
|
||||
SYSTEM_WORKSPACE = "__system__"
|
||||
INIT_STATE_TYPE = "init-state"
|
||||
|
||||
# Cadence tiers.
|
||||
GATE_BACKOFF = 5 # Services not responding; retry soon.
|
||||
INIT_RETRY = 15 # Gate passed but something ran/failed;
|
||||
# converge quickly.
|
||||
STEADY_INTERVAL = 300 # Everything at target flag; idle cheaply.
|
||||
|
||||
|
||||
@dataclass
|
||||
class InitialiserSpec:
|
||||
"""One entry in the bootstrapper's configured list of initialisers."""
|
||||
name: str
|
||||
flag: str
|
||||
instance: Initialiser
|
||||
|
||||
|
||||
def _resolve_class(dotted):
|
||||
"""Import and return a class by its dotted path."""
|
||||
module_path, _, class_name = dotted.rpartition(".")
|
||||
if not module_path:
|
||||
raise ValueError(
|
||||
f"Initialiser class must be a dotted path, got {dotted!r}"
|
||||
)
|
||||
module = importlib.import_module(module_path)
|
||||
return getattr(module, class_name)
|
||||
|
||||
|
||||
def _load_initialisers_file(path):
|
||||
"""Load the initialisers spec list from a YAML or JSON file.
|
||||
|
||||
File shape:
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
initialisers:
|
||||
- class: trustgraph.bootstrap.initialisers.PulsarTopology
|
||||
name: pulsar-topology
|
||||
flag: v1
|
||||
params:
|
||||
admin_url: http://pulsar:8080
|
||||
tenant: tg
|
||||
- ...
|
||||
"""
|
||||
with open(path) as f:
|
||||
content = f.read()
|
||||
if path.endswith((".yaml", ".yml")):
|
||||
import yaml
|
||||
doc = yaml.safe_load(content)
|
||||
else:
|
||||
doc = json.loads(content)
|
||||
if not isinstance(doc, dict) or "initialisers" not in doc:
|
||||
raise RuntimeError(
|
||||
f"{path}: expected a mapping with an 'initialisers' key"
|
||||
)
|
||||
return doc["initialisers"]
|
||||
|
||||
|
||||
class Processor(AsyncProcessor):
|
||||
|
||||
def __init__(self, **params):
|
||||
|
||||
super().__init__(**params)
|
||||
|
||||
# Source the initialisers list either from a direct parameter
|
||||
# (processor-group embedding) or from a file (CLI launch).
|
||||
inits = params.get("initialisers")
|
||||
if inits is None:
|
||||
inits_file = params.get("initialisers_file")
|
||||
if inits_file is None:
|
||||
raise RuntimeError(
|
||||
"Bootstrapper requires either the 'initialisers' "
|
||||
"parameter or --initialisers-file"
|
||||
)
|
||||
inits = _load_initialisers_file(inits_file)
|
||||
|
||||
self.specs = []
|
||||
names = set()
|
||||
|
||||
for entry in inits:
|
||||
if not isinstance(entry, dict):
|
||||
raise RuntimeError(
|
||||
f"Initialiser entry must be a mapping, got: {entry!r}"
|
||||
)
|
||||
for required in ("class", "name", "flag"):
|
||||
if required not in entry:
|
||||
raise RuntimeError(
|
||||
f"Initialiser entry missing required field "
|
||||
f"{required!r}: {entry!r}"
|
||||
)
|
||||
|
||||
name = entry["name"]
|
||||
if name in names:
|
||||
raise RuntimeError(f"Duplicate initialiser name {name!r}")
|
||||
names.add(name)
|
||||
|
||||
cls = _resolve_class(entry["class"])
|
||||
|
||||
try:
|
||||
instance = cls(**entry.get("params", {}))
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
f"Failed to instantiate initialiser "
|
||||
f"{entry['class']!r} as {name!r}: "
|
||||
f"{type(e).__name__}: {e}"
|
||||
)
|
||||
|
||||
self.specs.append(InitialiserSpec(
|
||||
name=name,
|
||||
flag=entry["flag"],
|
||||
instance=instance,
|
||||
))
|
||||
|
||||
logger.info(
|
||||
f"Bootstrapper: loaded {len(self.specs)} initialisers"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Client construction (short-lived per wake cycle).
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _make_config_client(self):
|
||||
rr_id = str(uuid.uuid4())
|
||||
return ConfigClient(
|
||||
backend=self.pubsub_backend,
|
||||
subscription=f"{self.id}--config--{rr_id}",
|
||||
consumer_name=self.id,
|
||||
request_topic=config_request_queue,
|
||||
request_schema=ConfigRequest,
|
||||
request_metrics=ProducerMetrics(
|
||||
processor=self.id, flow=None, name="config-request",
|
||||
),
|
||||
response_topic=config_response_queue,
|
||||
response_schema=ConfigResponse,
|
||||
response_metrics=SubscriberMetrics(
|
||||
processor=self.id, flow=None, name="config-response",
|
||||
),
|
||||
)
|
||||
|
||||
def _make_flow_client(self):
|
||||
rr_id = str(uuid.uuid4())
|
||||
return RequestResponse(
|
||||
backend=self.pubsub_backend,
|
||||
subscription=f"{self.id}--flow--{rr_id}",
|
||||
consumer_name=self.id,
|
||||
request_topic=flow_request_queue,
|
||||
request_schema=FlowRequest,
|
||||
request_metrics=ProducerMetrics(
|
||||
processor=self.id, flow=None, name="flow-request",
|
||||
),
|
||||
response_topic=flow_response_queue,
|
||||
response_schema=FlowResponse,
|
||||
response_metrics=SubscriberMetrics(
|
||||
processor=self.id, flow=None, name="flow-response",
|
||||
),
|
||||
)
|
||||
|
||||
async def _open_clients(self):
|
||||
config = self._make_config_client()
|
||||
flow = self._make_flow_client()
|
||||
await config.start()
|
||||
try:
|
||||
await flow.start()
|
||||
except Exception:
|
||||
await self._safe_stop(config)
|
||||
raise
|
||||
return config, flow
|
||||
|
||||
async def _safe_stop(self, client):
|
||||
try:
|
||||
await client.stop()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Service gate.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _gate_ready(self, config, flow):
|
||||
try:
|
||||
await config.keys(SYSTEM_WORKSPACE, INIT_STATE_TYPE)
|
||||
except Exception as e:
|
||||
logger.info(
|
||||
f"Gate: config-svc not ready ({type(e).__name__}: {e})"
|
||||
)
|
||||
return False
|
||||
|
||||
try:
|
||||
resp = await flow.request(
|
||||
FlowRequest(
|
||||
operation="list-blueprints",
|
||||
workspace=SYSTEM_WORKSPACE,
|
||||
),
|
||||
timeout=5,
|
||||
)
|
||||
if resp.error:
|
||||
logger.info(
|
||||
f"Gate: flow-svc error: "
|
||||
f"{resp.error.type}: {resp.error.message}"
|
||||
)
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.info(
|
||||
f"Gate: flow-svc not ready ({type(e).__name__}: {e})"
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Completion state.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _stored_flag(self, config, name):
|
||||
raw = await config.get(SYSTEM_WORKSPACE, INIT_STATE_TYPE, name)
|
||||
if raw is None:
|
||||
return None
|
||||
try:
|
||||
return json.loads(raw)
|
||||
except Exception:
|
||||
return raw
|
||||
|
||||
async def _store_flag(self, config, name, flag):
|
||||
await config.put(
|
||||
SYSTEM_WORKSPACE, INIT_STATE_TYPE, name,
|
||||
json.dumps(flag),
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Per-spec execution.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _run_spec(self, spec, config, flow):
|
||||
"""Run a single initialiser spec.
|
||||
|
||||
Returns one of:
|
||||
- ``"skip"``: stored flag already matches target, nothing to do.
|
||||
- ``"ran"``: initialiser ran and completion state was updated.
|
||||
- ``"failed"``: initialiser raised.
|
||||
- ``"failed-state-write"``: initialiser succeeded but we could
|
||||
not persist the new flag (transient — will re-run next cycle).
|
||||
"""
|
||||
|
||||
try:
|
||||
old_flag = await self._stored_flag(config, spec.name)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"{spec.name}: could not read stored flag "
|
||||
f"({type(e).__name__}: {e})"
|
||||
)
|
||||
return "failed"
|
||||
|
||||
if old_flag == spec.flag:
|
||||
return "skip"
|
||||
|
||||
child_logger = logger.getChild(spec.name)
|
||||
child_ctx = InitContext(
|
||||
logger=child_logger,
|
||||
config=config,
|
||||
flow=flow,
|
||||
)
|
||||
|
||||
child_logger.info(
|
||||
f"Running (old_flag={old_flag!r} -> new_flag={spec.flag!r})"
|
||||
)
|
||||
|
||||
try:
|
||||
await spec.instance.run(child_ctx, old_flag, spec.flag)
|
||||
except Exception as e:
|
||||
child_logger.error(
|
||||
f"Failed: {type(e).__name__}: {e}", exc_info=True,
|
||||
)
|
||||
return "failed"
|
||||
|
||||
try:
|
||||
await self._store_flag(config, spec.name, spec.flag)
|
||||
except Exception as e:
|
||||
child_logger.warning(
|
||||
f"Completed but could not persist state flag "
|
||||
f"({type(e).__name__}: {e}); will re-run next cycle"
|
||||
)
|
||||
return "failed-state-write"
|
||||
|
||||
child_logger.info(f"Completed (flag={spec.flag!r})")
|
||||
return "ran"
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Main loop.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def run(self):
|
||||
|
||||
logger.info(
|
||||
f"Bootstrapper starting with {len(self.specs)} initialisers"
|
||||
)
|
||||
|
||||
while self.running:
|
||||
|
||||
sleep_for = STEADY_INTERVAL
|
||||
|
||||
try:
|
||||
config, flow = await self._open_clients()
|
||||
except Exception as e:
|
||||
logger.info(
|
||||
f"Failed to open clients "
|
||||
f"({type(e).__name__}: {e}); retry in {GATE_BACKOFF}s"
|
||||
)
|
||||
await asyncio.sleep(GATE_BACKOFF)
|
||||
continue
|
||||
|
||||
try:
|
||||
# Phase 1: pre-service initialisers run unconditionally.
|
||||
pre_specs = [
|
||||
s for s in self.specs
|
||||
if not s.instance.wait_for_services
|
||||
]
|
||||
pre_results = {}
|
||||
for spec in pre_specs:
|
||||
pre_results[spec.name] = await self._run_spec(
|
||||
spec, config, flow,
|
||||
)
|
||||
|
||||
# Phase 2: gate.
|
||||
gate_ok = await self._gate_ready(config, flow)
|
||||
|
||||
# Phase 3: post-service initialisers, if gate passed.
|
||||
post_results = {}
|
||||
if gate_ok:
|
||||
post_specs = [
|
||||
s for s in self.specs
|
||||
if s.instance.wait_for_services
|
||||
]
|
||||
for spec in post_specs:
|
||||
post_results[spec.name] = await self._run_spec(
|
||||
spec, config, flow,
|
||||
)
|
||||
|
||||
# Cadence selection.
|
||||
if not gate_ok:
|
||||
sleep_for = GATE_BACKOFF
|
||||
else:
|
||||
all_results = {**pre_results, **post_results}
|
||||
if any(r != "skip" for r in all_results.values()):
|
||||
sleep_for = INIT_RETRY
|
||||
else:
|
||||
sleep_for = STEADY_INTERVAL
|
||||
|
||||
finally:
|
||||
await self._safe_stop(config)
|
||||
await self._safe_stop(flow)
|
||||
|
||||
await asyncio.sleep(sleep_for)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# CLI arg plumbing.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser: ArgumentParser) -> None:
|
||||
|
||||
AsyncProcessor.add_args(parser)
|
||||
|
||||
parser.add_argument(
|
||||
'-c', '--initialisers-file',
|
||||
help='Path to YAML or JSON file describing the '
|
||||
'initialisers to run. Ignored when the '
|
||||
"'initialisers' parameter is provided directly "
|
||||
'(e.g. when running inside a processor group).',
|
||||
)
|
||||
|
||||
|
||||
def run():
|
||||
Processor.launch(default_ident, __doc__)
|
||||
|
|
@ -1,430 +0,0 @@
|
|||
"""
|
||||
Config bootstrap service.
|
||||
|
||||
Runs a slow reconciliation loop that:
|
||||
|
||||
1. Seeds the ``__template__`` workspace from an external JSON seed
|
||||
file on first boot. The seed file has the same shape as
|
||||
``tg-init-trustgraph``'s input: a top-level dict of
|
||||
``{ config-type: { config-key: value } }`` where values are the
|
||||
raw (JSON-serialisable) payloads to store.
|
||||
|
||||
2. On first boot, copies ``__template__`` into a normal workspace
|
||||
(default: ``default``) so a single-tenant system has a working
|
||||
workspace without manual provisioning.
|
||||
|
||||
3. Optionally starts a default flow in that workspace using a
|
||||
specified blueprint.
|
||||
|
||||
Each of these three steps is gated on a durable marker stored in the
|
||||
reserved ``__system__`` workspace. Markers are set on successful
|
||||
completion of their own step; once set they are never touched again,
|
||||
so the loop becomes a cheap no-op after first boot. Markers are also
|
||||
independent: a partial failure (e.g. default workspace seeded but
|
||||
flow-start failed) is resumable on next wake.
|
||||
|
||||
Reserved workspaces ``__template__`` and ``__system__`` start with an
|
||||
underscore and so are excluded from config push notifications — no
|
||||
live service sees their contents.
|
||||
|
||||
Cadence:
|
||||
- CYCLE_INTERVAL (300s) on success / steady state
|
||||
- CONNECT_BACKOFF (10s) on connection failure to config-svc
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from argparse import ArgumentParser
|
||||
|
||||
from trustgraph.base import AsyncProcessor
|
||||
from trustgraph.base import ProducerMetrics, SubscriberMetrics
|
||||
from trustgraph.base.config_client import ConfigClient
|
||||
from trustgraph.schema import (
|
||||
ConfigRequest, ConfigResponse,
|
||||
config_request_queue, config_response_queue,
|
||||
)
|
||||
from trustgraph.schema import (
|
||||
FlowRequest, FlowResponse,
|
||||
flow_request_queue, flow_response_queue,
|
||||
)
|
||||
from trustgraph.base.request_response_spec import RequestResponse
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
default_ident = "config-bootstrap"
|
||||
|
||||
# Reserved workspaces
|
||||
TEMPLATE_WORKSPACE = "__template__"
|
||||
SYSTEM_WORKSPACE = "__system__"
|
||||
|
||||
# System marker type + keys
|
||||
BOOTSTRAP_MARKER_TYPE = "bootstrap"
|
||||
TEMPLATE_SEEDED_KEY = "template-seeded"
|
||||
DEFAULT_WORKSPACE_SEEDED_KEY = "default-workspace-seeded"
|
||||
DEFAULT_FLOW_STARTED_KEY = "default-flow-started"
|
||||
|
||||
# Loop cadence
|
||||
CYCLE_INTERVAL = 300
|
||||
CONNECT_BACKOFF = 10
|
||||
|
||||
default_default_workspace = "default"
|
||||
default_default_flow_id = "default"
|
||||
default_default_flow_description = "Default"
|
||||
|
||||
|
||||
class Processor(AsyncProcessor):
|
||||
|
||||
def __init__(self, **params):
|
||||
|
||||
super().__init__(**params)
|
||||
|
||||
self.config_file = params.get("config_file")
|
||||
|
||||
self.default_workspace = params.get(
|
||||
"default_workspace", default_default_workspace,
|
||||
)
|
||||
self.default_flow_id = params.get(
|
||||
"default_flow_id", default_default_flow_id,
|
||||
)
|
||||
self.default_flow_blueprint = params.get(
|
||||
"default_flow_blueprint",
|
||||
)
|
||||
self.default_flow_description = params.get(
|
||||
"default_flow_description", default_default_flow_description,
|
||||
)
|
||||
|
||||
# Optional flow parameters as JSON string
|
||||
default_flow_parameters_raw = params.get("default_flow_parameters")
|
||||
if default_flow_parameters_raw:
|
||||
try:
|
||||
self.default_flow_parameters = json.loads(
|
||||
default_flow_parameters_raw
|
||||
)
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
f"Invalid --default-flow-parameters JSON: {e}"
|
||||
)
|
||||
else:
|
||||
self.default_flow_parameters = {}
|
||||
|
||||
if self.config_file is None:
|
||||
raise RuntimeError(
|
||||
"config_file is required (--config-file)"
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Config bootstrap: seed file={self.config_file} "
|
||||
f"default_workspace={self.default_workspace} "
|
||||
f"default_flow_id={self.default_flow_id} "
|
||||
f"default_flow_blueprint={self.default_flow_blueprint or '(none)'}"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Helpers for talking to config-svc and flow-svc via req/resp.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _make_config_client(self):
|
||||
"""Short-lived ConfigClient over the shared pubsub backend."""
|
||||
|
||||
rr_id = str(uuid.uuid4())
|
||||
|
||||
req_metrics = ProducerMetrics(
|
||||
processor=self.id, flow=None, name="config-request",
|
||||
)
|
||||
resp_metrics = SubscriberMetrics(
|
||||
processor=self.id, flow=None, name="config-response",
|
||||
)
|
||||
|
||||
return ConfigClient(
|
||||
backend=self.pubsub_backend,
|
||||
subscription=f"{self.id}--config--{rr_id}",
|
||||
consumer_name=self.id,
|
||||
request_topic=config_request_queue,
|
||||
request_schema=ConfigRequest,
|
||||
request_metrics=req_metrics,
|
||||
response_topic=config_response_queue,
|
||||
response_schema=ConfigResponse,
|
||||
response_metrics=resp_metrics,
|
||||
)
|
||||
|
||||
def _make_flow_client(self):
|
||||
"""Short-lived RequestResponse client for flow-svc."""
|
||||
|
||||
rr_id = str(uuid.uuid4())
|
||||
|
||||
req_metrics = ProducerMetrics(
|
||||
processor=self.id, flow=None, name="flow-request",
|
||||
)
|
||||
resp_metrics = SubscriberMetrics(
|
||||
processor=self.id, flow=None, name="flow-response",
|
||||
)
|
||||
|
||||
return RequestResponse(
|
||||
backend=self.pubsub_backend,
|
||||
subscription=f"{self.id}--flow--{rr_id}",
|
||||
consumer_name=self.id,
|
||||
request_topic=flow_request_queue,
|
||||
request_schema=FlowRequest,
|
||||
request_metrics=req_metrics,
|
||||
response_topic=flow_response_queue,
|
||||
response_schema=FlowResponse,
|
||||
response_metrics=resp_metrics,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Marker helpers. Markers live in __system__/bootstrap/<key>.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _marker_is_set(self, config, key):
|
||||
value = await config.get(
|
||||
SYSTEM_WORKSPACE, BOOTSTRAP_MARKER_TYPE, key,
|
||||
)
|
||||
return value is not None
|
||||
|
||||
async def _marker_set(self, config, key):
|
||||
await config.put(
|
||||
SYSTEM_WORKSPACE, BOOTSTRAP_MARKER_TYPE, key,
|
||||
json.dumps(True),
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Seed data.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _load_seed(self):
|
||||
"""Read the seed JSON file from disk. Returns
|
||||
``{type: {key: raw-value}}``."""
|
||||
with open(self.config_file) as f:
|
||||
return json.load(f)
|
||||
|
||||
async def _write_config_tree(self, config, workspace, tree):
|
||||
"""Write every (type, key, value) pair from a seed tree into
|
||||
the given workspace as a single batched put."""
|
||||
values = []
|
||||
for type_name, entries in tree.items():
|
||||
for key, value in entries.items():
|
||||
values.append(
|
||||
(type_name, key, json.dumps(value))
|
||||
)
|
||||
if values:
|
||||
await config.put_many(workspace, values)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Reconciliation steps.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _seed_template(self, config):
|
||||
"""Step 1: populate __template__ from the seed file if not
|
||||
already seeded."""
|
||||
|
||||
if await self._marker_is_set(config, TEMPLATE_SEEDED_KEY):
|
||||
logger.debug("Template already seeded.")
|
||||
return False
|
||||
|
||||
logger.info(
|
||||
f"Seeding __template__ workspace from {self.config_file}"
|
||||
)
|
||||
|
||||
seed = self._load_seed()
|
||||
|
||||
await self._write_config_tree(config, TEMPLATE_WORKSPACE, seed)
|
||||
|
||||
await self._marker_set(config, TEMPLATE_SEEDED_KEY)
|
||||
|
||||
logger.info("Template seeded.")
|
||||
return True
|
||||
|
||||
async def _bootstrap_default_workspace(self, config):
|
||||
"""Step 2: copy __template__ content into the default
|
||||
workspace on first boot."""
|
||||
|
||||
if await self._marker_is_set(
|
||||
config, DEFAULT_WORKSPACE_SEEDED_KEY,
|
||||
):
|
||||
logger.debug("Default workspace already bootstrapped.")
|
||||
return False
|
||||
|
||||
logger.info(
|
||||
f"Bootstrapping default workspace '{self.default_workspace}' "
|
||||
f"from __template__"
|
||||
)
|
||||
|
||||
# Re-read the seed file so the default workspace is populated
|
||||
# from the same source of truth as __template__. We do not
|
||||
# copy from __template__ via the config service because
|
||||
# __template__ may have been edited after seeding; the seed
|
||||
# file is the factory default.
|
||||
seed = self._load_seed()
|
||||
|
||||
await self._write_config_tree(
|
||||
config, self.default_workspace, seed,
|
||||
)
|
||||
|
||||
await self._marker_set(config, DEFAULT_WORKSPACE_SEEDED_KEY)
|
||||
|
||||
logger.info(
|
||||
f"Default workspace '{self.default_workspace}' bootstrapped."
|
||||
)
|
||||
return True
|
||||
|
||||
async def _start_default_flow(self, config):
|
||||
"""Step 3: start a default flow if a blueprint was supplied
|
||||
and the flow has not already been started."""
|
||||
|
||||
if self.default_flow_blueprint is None:
|
||||
logger.debug(
|
||||
"No default-flow-blueprint configured, skipping."
|
||||
)
|
||||
return False
|
||||
|
||||
if await self._marker_is_set(
|
||||
config, DEFAULT_FLOW_STARTED_KEY,
|
||||
):
|
||||
logger.debug("Default flow already started.")
|
||||
return False
|
||||
|
||||
logger.info(
|
||||
f"Starting default flow "
|
||||
f"'{self.default_flow_id}' "
|
||||
f"(blueprint '{self.default_flow_blueprint}') "
|
||||
f"in workspace '{self.default_workspace}'"
|
||||
)
|
||||
|
||||
flow_client = self._make_flow_client()
|
||||
try:
|
||||
await flow_client.start()
|
||||
|
||||
req = FlowRequest(
|
||||
operation="start-flow",
|
||||
workspace=self.default_workspace,
|
||||
flow_id=self.default_flow_id,
|
||||
blueprint_name=self.default_flow_blueprint,
|
||||
description=self.default_flow_description,
|
||||
parameters=self.default_flow_parameters,
|
||||
)
|
||||
|
||||
resp = await flow_client.request(req, timeout=30)
|
||||
|
||||
if resp.error:
|
||||
raise RuntimeError(
|
||||
f"start-flow failed: "
|
||||
f"{resp.error.type}: {resp.error.message}"
|
||||
)
|
||||
|
||||
finally:
|
||||
try:
|
||||
await flow_client.stop()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
await self._marker_set(config, DEFAULT_FLOW_STARTED_KEY)
|
||||
|
||||
logger.info("Default flow started.")
|
||||
return True
|
||||
|
||||
async def _reconcile_once(self):
|
||||
"""Run all three steps in order; each is a cheap no-op once
|
||||
its marker is set."""
|
||||
|
||||
config = self._make_config_client()
|
||||
try:
|
||||
await config.start()
|
||||
await self._seed_template(config)
|
||||
await self._bootstrap_default_workspace(config)
|
||||
await self._start_default_flow(config)
|
||||
finally:
|
||||
try:
|
||||
await config.stop()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Main loop — the processor-group / standalone entry point.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def run(self):
|
||||
|
||||
logger.info("Config bootstrap loop starting.")
|
||||
|
||||
while self.running:
|
||||
|
||||
try:
|
||||
await self._reconcile_once()
|
||||
|
||||
except (ConnectionError, TimeoutError, OSError) as e:
|
||||
logger.info(
|
||||
f"Bootstrap: connect/timeout error "
|
||||
f"({type(e).__name__}: {e}); retry in "
|
||||
f"{CONNECT_BACKOFF}s"
|
||||
)
|
||||
await asyncio.sleep(CONNECT_BACKOFF)
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Bootstrap reconcile failed: "
|
||||
f"{type(e).__name__}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
# Treat as transient, fall through to long sleep so
|
||||
# we don't spin on persistent failures.
|
||||
|
||||
await asyncio.sleep(CYCLE_INTERVAL)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# CLI arg plumbing.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser: ArgumentParser) -> None:
|
||||
|
||||
AsyncProcessor.add_args(parser)
|
||||
|
||||
parser.add_argument(
|
||||
'-C', '--config-file',
|
||||
required=True,
|
||||
help='Path to the seed JSON config file '
|
||||
'({type: {key: value}} shape)',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-w', '--default-workspace',
|
||||
default=default_default_workspace,
|
||||
help=f'Name of the default workspace to bootstrap '
|
||||
f'(default: {default_default_workspace})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--default-flow-id',
|
||||
default=default_default_flow_id,
|
||||
help=f'Flow id to start in the default workspace '
|
||||
f'(default: {default_default_flow_id})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--default-flow-blueprint',
|
||||
default=None,
|
||||
help='Blueprint to use for the default flow. '
|
||||
'If not set, no default flow is started.',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--default-flow-description',
|
||||
default=default_default_flow_description,
|
||||
help=f'Description for the default flow '
|
||||
f'(default: "{default_default_flow_description}")',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--default-flow-parameters',
|
||||
default=None,
|
||||
help='Optional JSON string of parameters to pass to '
|
||||
'start-flow (e.g. \'{"llm-model":"..."}\').',
|
||||
)
|
||||
|
||||
|
||||
def run():
|
||||
Processor.launch(default_ident, __doc__)
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
"""
|
||||
Core bootstrap initialisers.
|
||||
|
||||
These cover the base TrustGraph deployment case. Enterprise or
|
||||
third-party initialisers live in their own packages and are
|
||||
referenced in the bootstrapper's config by fully-qualified dotted
|
||||
path.
|
||||
"""
|
||||
|
||||
from . pulsar_topology import PulsarTopology
|
||||
from . template_seed import TemplateSeed
|
||||
from . workspace_init import WorkspaceInit
|
||||
from . default_flow_start import DefaultFlowStart
|
||||
|
||||
__all__ = [
|
||||
"PulsarTopology",
|
||||
"TemplateSeed",
|
||||
"WorkspaceInit",
|
||||
"DefaultFlowStart",
|
||||
]
|
||||
|
|
@ -0,0 +1,101 @@
|
|||
"""
|
||||
DefaultFlowStart initialiser — starts a named flow in a workspace
|
||||
using a specified blueprint.
|
||||
|
||||
Separated from WorkspaceInit so deployments that want a workspace
|
||||
without an auto-started flow can simply omit this initialiser.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
workspace : str (default "default")
|
||||
Workspace in which to start the flow.
|
||||
flow_id : str (default "default")
|
||||
Identifier for the started flow.
|
||||
blueprint : str (required)
|
||||
Blueprint name (must already exist in the workspace's config,
|
||||
typically via TemplateSeed -> WorkspaceInit).
|
||||
description : str (default "Default")
|
||||
Human-readable description passed to flow-svc.
|
||||
parameters : dict (optional)
|
||||
Optional parameter overrides passed to start-flow.
|
||||
"""
|
||||
|
||||
from trustgraph.schema import FlowRequest
|
||||
|
||||
from .. base import Initialiser
|
||||
|
||||
|
||||
class DefaultFlowStart(Initialiser):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
workspace="default",
|
||||
flow_id="default",
|
||||
blueprint=None,
|
||||
description="Default",
|
||||
parameters=None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
if not blueprint:
|
||||
raise ValueError(
|
||||
"DefaultFlowStart requires 'blueprint'"
|
||||
)
|
||||
self.workspace = workspace
|
||||
self.flow_id = flow_id
|
||||
self.blueprint = blueprint
|
||||
self.description = description
|
||||
self.parameters = dict(parameters) if parameters else {}
|
||||
|
||||
async def run(self, ctx, old_flag, new_flag):
|
||||
|
||||
# Check whether the flow already exists. Belt-and-braces
|
||||
# beyond the flag gate: if an operator stops and restarts the
|
||||
# bootstrapper after the flow is already running, we don't
|
||||
# want to blindly try to start it again.
|
||||
list_resp = await ctx.flow.request(
|
||||
FlowRequest(
|
||||
operation="list-flows",
|
||||
workspace=self.workspace,
|
||||
),
|
||||
timeout=10,
|
||||
)
|
||||
if list_resp.error:
|
||||
raise RuntimeError(
|
||||
f"list-flows failed: "
|
||||
f"{list_resp.error.type}: {list_resp.error.message}"
|
||||
)
|
||||
|
||||
if self.flow_id in (list_resp.flow_ids or []):
|
||||
ctx.logger.info(
|
||||
f"Flow {self.flow_id!r} already running in workspace "
|
||||
f"{self.workspace!r}; nothing to do"
|
||||
)
|
||||
return
|
||||
|
||||
ctx.logger.info(
|
||||
f"Starting flow {self.flow_id!r} "
|
||||
f"(blueprint={self.blueprint!r}) "
|
||||
f"in workspace {self.workspace!r}"
|
||||
)
|
||||
|
||||
resp = await ctx.flow.request(
|
||||
FlowRequest(
|
||||
operation="start-flow",
|
||||
workspace=self.workspace,
|
||||
flow_id=self.flow_id,
|
||||
blueprint_name=self.blueprint,
|
||||
description=self.description,
|
||||
parameters=self.parameters,
|
||||
),
|
||||
timeout=30,
|
||||
)
|
||||
if resp.error:
|
||||
raise RuntimeError(
|
||||
f"start-flow failed: "
|
||||
f"{resp.error.type}: {resp.error.message}"
|
||||
)
|
||||
|
||||
ctx.logger.info(
|
||||
f"Flow {self.flow_id!r} started"
|
||||
)
|
||||
|
|
@ -0,0 +1,131 @@
|
|||
"""
|
||||
PulsarTopology initialiser — creates Pulsar tenant and namespaces
|
||||
with their retention policies.
|
||||
|
||||
Runs pre-gate (``wait_for_services = False``) because config-svc and
|
||||
flow-svc can't connect to Pulsar until these namespaces exist.
|
||||
Admin-API calls are idempotent so re-runs on flag change are safe.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import requests
|
||||
|
||||
from .. base import Initialiser
|
||||
|
||||
# Namespace configs. flow/request take broker defaults. response
|
||||
# and notify get aggressive retention — those classes carry short-lived
|
||||
# request/response and notification traffic only.
|
||||
NAMESPACE_CONFIG = {
|
||||
"flow": {},
|
||||
"request": {},
|
||||
"response": {
|
||||
"retention_policies": {
|
||||
"retentionSizeInMB": -1,
|
||||
"retentionTimeInMinutes": 3,
|
||||
"subscriptionExpirationTimeMinutes": 30,
|
||||
},
|
||||
},
|
||||
"notify": {
|
||||
"retention_policies": {
|
||||
"retentionSizeInMB": -1,
|
||||
"retentionTimeInMinutes": 3,
|
||||
"subscriptionExpirationTimeMinutes": 5,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
REQUEST_TIMEOUT = 10
|
||||
|
||||
|
||||
class PulsarTopology(Initialiser):
|
||||
|
||||
wait_for_services = False
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
admin_url="http://pulsar:8080",
|
||||
tenant="tg",
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self.admin_url = admin_url.rstrip("/")
|
||||
self.tenant = tenant
|
||||
|
||||
async def run(self, ctx, old_flag, new_flag):
|
||||
# requests is blocking; offload to executor so the loop stays
|
||||
# responsive.
|
||||
loop = asyncio.get_event_loop()
|
||||
await loop.run_in_executor(None, self._reconcile_sync, ctx.logger)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Sync admin-API calls.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _get_clusters(self):
|
||||
resp = requests.get(
|
||||
f"{self.admin_url}/admin/v2/clusters",
|
||||
timeout=REQUEST_TIMEOUT,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
def _tenant_exists(self):
|
||||
resp = requests.get(
|
||||
f"{self.admin_url}/admin/v2/tenants/{self.tenant}",
|
||||
timeout=REQUEST_TIMEOUT,
|
||||
)
|
||||
return resp.status_code == 200
|
||||
|
||||
def _create_tenant(self, clusters):
|
||||
resp = requests.put(
|
||||
f"{self.admin_url}/admin/v2/tenants/{self.tenant}",
|
||||
json={"adminRoles": [], "allowedClusters": clusters},
|
||||
timeout=REQUEST_TIMEOUT,
|
||||
)
|
||||
if resp.status_code != 204:
|
||||
raise RuntimeError(
|
||||
f"Tenant {self.tenant!r} create failed: "
|
||||
f"{resp.status_code} {resp.text}"
|
||||
)
|
||||
|
||||
def _namespace_exists(self, namespace):
|
||||
resp = requests.get(
|
||||
f"{self.admin_url}/admin/v2/namespaces/"
|
||||
f"{self.tenant}/{namespace}",
|
||||
timeout=REQUEST_TIMEOUT,
|
||||
)
|
||||
return resp.status_code == 200
|
||||
|
||||
def _create_namespace(self, namespace, config):
|
||||
resp = requests.put(
|
||||
f"{self.admin_url}/admin/v2/namespaces/"
|
||||
f"{self.tenant}/{namespace}",
|
||||
json=config,
|
||||
timeout=REQUEST_TIMEOUT,
|
||||
)
|
||||
if resp.status_code != 204:
|
||||
raise RuntimeError(
|
||||
f"Namespace {self.tenant}/{namespace} create failed: "
|
||||
f"{resp.status_code} {resp.text}"
|
||||
)
|
||||
|
||||
def _reconcile_sync(self, logger):
|
||||
if not self._tenant_exists():
|
||||
clusters = self._get_clusters()
|
||||
logger.info(
|
||||
f"Creating tenant {self.tenant!r} with clusters {clusters}"
|
||||
)
|
||||
self._create_tenant(clusters)
|
||||
else:
|
||||
logger.debug(f"Tenant {self.tenant!r} already exists")
|
||||
|
||||
for namespace, config in NAMESPACE_CONFIG.items():
|
||||
if self._namespace_exists(namespace):
|
||||
logger.debug(
|
||||
f"Namespace {self.tenant}/{namespace} already exists"
|
||||
)
|
||||
continue
|
||||
logger.info(
|
||||
f"Creating namespace {self.tenant}/{namespace}"
|
||||
)
|
||||
self._create_namespace(namespace, config)
|
||||
|
|
@ -0,0 +1,93 @@
|
|||
"""
|
||||
TemplateSeed initialiser — populates the reserved ``__template__``
|
||||
workspace from an external JSON seed file.
|
||||
|
||||
Seed file shape:
|
||||
|
||||
.. code-block:: json
|
||||
|
||||
{
|
||||
"flow-blueprint": {
|
||||
"ontology": { ... },
|
||||
"agent": { ... }
|
||||
},
|
||||
"prompt": {
|
||||
...
|
||||
},
|
||||
...
|
||||
}
|
||||
|
||||
Top-level keys are config types; nested keys are config entries.
|
||||
Values are arbitrary JSON (they'll be ``json.dumps()``'d on write).
|
||||
|
||||
Parameters
|
||||
----------
|
||||
config_file : str
|
||||
Path to the seed file on disk.
|
||||
overwrite : bool (default False)
|
||||
On re-run (flag change), if True overwrite all keys; if False
|
||||
upsert-missing-only (preserves any operator customisation of
|
||||
the template).
|
||||
"""
|
||||
|
||||
import json
|
||||
|
||||
from .. base import Initialiser
|
||||
|
||||
TEMPLATE_WORKSPACE = "__template__"
|
||||
|
||||
|
||||
class TemplateSeed(Initialiser):
|
||||
|
||||
def __init__(self, config_file, overwrite=False, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
if not config_file:
|
||||
raise ValueError("TemplateSeed requires 'config_file'")
|
||||
self.config_file = config_file
|
||||
self.overwrite = overwrite
|
||||
|
||||
async def run(self, ctx, old_flag, new_flag):
|
||||
|
||||
with open(self.config_file) as f:
|
||||
seed = json.load(f)
|
||||
|
||||
if old_flag is None:
|
||||
# Clean first run — write every entry.
|
||||
await self._write_all(ctx, seed)
|
||||
return
|
||||
|
||||
# Re-run after flag change.
|
||||
if self.overwrite:
|
||||
await self._write_all(ctx, seed)
|
||||
else:
|
||||
await self._upsert_missing(ctx, seed)
|
||||
|
||||
async def _write_all(self, ctx, seed):
|
||||
values = []
|
||||
for type_name, entries in seed.items():
|
||||
for key, value in entries.items():
|
||||
values.append((type_name, key, json.dumps(value)))
|
||||
if values:
|
||||
await ctx.config.put_many(TEMPLATE_WORKSPACE, values)
|
||||
ctx.logger.info(
|
||||
f"Template seeded with {len(values)} entries"
|
||||
)
|
||||
|
||||
async def _upsert_missing(self, ctx, seed):
|
||||
written = 0
|
||||
for type_name, entries in seed.items():
|
||||
existing = set(
|
||||
await ctx.config.keys(TEMPLATE_WORKSPACE, type_name)
|
||||
)
|
||||
values = []
|
||||
for key, value in entries.items():
|
||||
if key not in existing:
|
||||
values.append(
|
||||
(type_name, key, json.dumps(value))
|
||||
)
|
||||
if values:
|
||||
await ctx.config.put_many(TEMPLATE_WORKSPACE, values)
|
||||
written += len(values)
|
||||
ctx.logger.info(
|
||||
f"Template upsert-missing: {written} new entries"
|
||||
)
|
||||
|
|
@ -0,0 +1,154 @@
|
|||
"""
|
||||
WorkspaceInit initialiser — creates a workspace and populates it from
|
||||
either the ``__template__`` workspace or a seed file on disk.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
workspace : str
|
||||
Target workspace to create / populate.
|
||||
source : str
|
||||
Either ``"template"`` (copy from the ``__template__`` workspace)
|
||||
or ``"seed-file"`` (read from ``seed_file``).
|
||||
seed_file : str (required when source=="seed-file")
|
||||
Path to a JSON seed file with the same shape TemplateSeed consumes.
|
||||
types : list[str] (only when source=="template")
|
||||
Config types to copy from ``__template__``. Required when
|
||||
source is ``template`` because there is no config API for
|
||||
"list all types in a workspace".
|
||||
overwrite : bool (default False)
|
||||
On re-run (flag change), if True overwrite all keys; if False,
|
||||
upsert-missing-only (preserves in-workspace customisations).
|
||||
|
||||
Raises (in ``run``)
|
||||
-------------------
|
||||
When source is ``"template"``, raises ``RuntimeError`` if the
|
||||
``__template__`` workspace has no content of the requested types —
|
||||
indicating that TemplateSeed hasn't run yet. The bootstrapper's
|
||||
retry loop will re-attempt on the next cycle once the prerequisite
|
||||
is satisfied.
|
||||
"""
|
||||
|
||||
import json
|
||||
|
||||
from .. base import Initialiser
|
||||
|
||||
TEMPLATE_WORKSPACE = "__template__"
|
||||
|
||||
|
||||
class WorkspaceInit(Initialiser):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
workspace="default",
|
||||
source="template",
|
||||
seed_file=None,
|
||||
types=None,
|
||||
overwrite=False,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
if source not in ("template", "seed-file"):
|
||||
raise ValueError(
|
||||
f"WorkspaceInit: source must be 'template' or "
|
||||
f"'seed-file', got {source!r}"
|
||||
)
|
||||
if source == "seed-file" and not seed_file:
|
||||
raise ValueError(
|
||||
"WorkspaceInit: seed_file required when source='seed-file'"
|
||||
)
|
||||
if source == "template" and not types:
|
||||
raise ValueError(
|
||||
"WorkspaceInit: types required when source='template'"
|
||||
)
|
||||
|
||||
self.workspace = workspace
|
||||
self.source = source
|
||||
self.seed_file = seed_file
|
||||
self.types = list(types) if types else []
|
||||
self.overwrite = overwrite
|
||||
|
||||
async def run(self, ctx, old_flag, new_flag):
|
||||
if self.source == "seed-file":
|
||||
tree = self._load_seed_file()
|
||||
else:
|
||||
tree = await self._load_from_template(ctx)
|
||||
|
||||
if old_flag is None or self.overwrite:
|
||||
await self._write_all(ctx, tree)
|
||||
else:
|
||||
await self._upsert_missing(ctx, tree)
|
||||
|
||||
def _load_seed_file(self):
|
||||
with open(self.seed_file) as f:
|
||||
return json.load(f)
|
||||
|
||||
async def _load_from_template(self, ctx):
|
||||
"""Build a seed tree from __template__ for the configured
|
||||
types. Raises if nothing is present, so the bootstrapper
|
||||
knows the prerequisite isn't met yet."""
|
||||
|
||||
tree = {}
|
||||
total = 0
|
||||
for type_name in self.types:
|
||||
keys = await ctx.config.keys(TEMPLATE_WORKSPACE, type_name)
|
||||
if not keys:
|
||||
continue
|
||||
entries = {}
|
||||
for key in keys:
|
||||
raw = await ctx.config.get(
|
||||
TEMPLATE_WORKSPACE, type_name, key,
|
||||
)
|
||||
if raw is None:
|
||||
continue
|
||||
try:
|
||||
entries[key] = json.loads(raw)
|
||||
except Exception:
|
||||
entries[key] = raw
|
||||
total += 1
|
||||
if entries:
|
||||
tree[type_name] = entries
|
||||
|
||||
if total == 0:
|
||||
raise RuntimeError(
|
||||
f"Template workspace has no content for types "
|
||||
f"{self.types!r} — has TemplateSeed run yet?"
|
||||
)
|
||||
|
||||
ctx.logger.debug(
|
||||
f"Loaded {total} template entries across "
|
||||
f"{len(tree)} types"
|
||||
)
|
||||
return tree
|
||||
|
||||
async def _write_all(self, ctx, tree):
|
||||
values = []
|
||||
for type_name, entries in tree.items():
|
||||
for key, value in entries.items():
|
||||
values.append((type_name, key, json.dumps(value)))
|
||||
if values:
|
||||
await ctx.config.put_many(self.workspace, values)
|
||||
ctx.logger.info(
|
||||
f"Workspace {self.workspace!r} populated with "
|
||||
f"{len(values)} entries"
|
||||
)
|
||||
|
||||
async def _upsert_missing(self, ctx, tree):
|
||||
written = 0
|
||||
for type_name, entries in tree.items():
|
||||
existing = set(
|
||||
await ctx.config.keys(self.workspace, type_name)
|
||||
)
|
||||
values = []
|
||||
for key, value in entries.items():
|
||||
if key not in existing:
|
||||
values.append(
|
||||
(type_name, key, json.dumps(value))
|
||||
)
|
||||
if values:
|
||||
await ctx.config.put_many(self.workspace, values)
|
||||
written += len(values)
|
||||
ctx.logger.info(
|
||||
f"Workspace {self.workspace!r} upsert-missing: "
|
||||
f"{written} new entries"
|
||||
)
|
||||
|
|
@ -1 +0,0 @@
|
|||
from . service import *
|
||||
|
|
@ -1,6 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from . service import run
|
||||
|
||||
if __name__ == '__main__':
|
||||
run()
|
||||
|
|
@ -1,224 +0,0 @@
|
|||
"""
|
||||
Pulsar topology bootstrap service.
|
||||
|
||||
Runs a slow reconciliation loop that ensures the Pulsar tenant and
|
||||
namespaces TrustGraph requires are present, creating them if they
|
||||
aren't. All operations are idempotent, so the loop is a cheap no-op
|
||||
once the cluster has been initialised.
|
||||
|
||||
Only relevant when running on the Pulsar pub/sub backend — for
|
||||
RabbitMQ / Kafka this service can simply be omitted from the
|
||||
processor group.
|
||||
|
||||
Namespaces created:
|
||||
- ``<tenant>/flow`` — data-path topics (durable, no retention limit)
|
||||
- ``<tenant>/request`` — request-class topics
|
||||
- ``<tenant>/response`` — response-class topics (short retention)
|
||||
- ``<tenant>/notify`` — notify topics (very short retention)
|
||||
|
||||
This is a thin reconciler around the same admin-API calls
|
||||
``tg-init-trustgraph`` has always made; what's new is that it lives
|
||||
in a processor group, keeps retrying on failure, and doesn't need a
|
||||
dedicated container.
|
||||
|
||||
Cadence:
|
||||
- CYCLE_INTERVAL (600s) on success / steady state
|
||||
- CONNECT_BACKOFF (10s) on admin-API failure
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from argparse import ArgumentParser
|
||||
|
||||
import requests
|
||||
|
||||
from trustgraph.base import AsyncProcessor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
default_ident = "pulsar-bootstrap"
|
||||
default_pulsar_admin_url = "http://pulsar:8080"
|
||||
default_tenant = "tg"
|
||||
|
||||
# Namespace configs. flow/request take broker defaults; response and
|
||||
# notify have aggressive retention to keep the broker lean (short-lived
|
||||
# reply and notification traffic only).
|
||||
NAMESPACE_CONFIG = {
|
||||
"flow": {},
|
||||
"request": {},
|
||||
"response": {
|
||||
"retention_policies": {
|
||||
"retentionSizeInMB": -1,
|
||||
"retentionTimeInMinutes": 3,
|
||||
"subscriptionExpirationTimeMinutes": 30,
|
||||
},
|
||||
},
|
||||
"notify": {
|
||||
"retention_policies": {
|
||||
"retentionSizeInMB": -1,
|
||||
"retentionTimeInMinutes": 3,
|
||||
"subscriptionExpirationTimeMinutes": 5,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
CYCLE_INTERVAL = 600
|
||||
CONNECT_BACKOFF = 10
|
||||
REQUEST_TIMEOUT = 10
|
||||
|
||||
|
||||
class Processor(AsyncProcessor):
|
||||
|
||||
def __init__(self, **params):
|
||||
|
||||
super().__init__(**params)
|
||||
|
||||
self.pulsar_admin_url = params.get(
|
||||
"pulsar_admin_url", default_pulsar_admin_url,
|
||||
)
|
||||
self.tenant = params.get("tenant", default_tenant)
|
||||
|
||||
logger.info(
|
||||
f"Pulsar bootstrap: admin={self.pulsar_admin_url} "
|
||||
f"tenant={self.tenant}"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Admin API helpers — synchronous requests, wrapped so the loop can
|
||||
# treat them like any other awaitable. Running them on the default
|
||||
# executor keeps the event loop responsive.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _get_clusters(self):
|
||||
resp = requests.get(
|
||||
f"{self.pulsar_admin_url}/admin/v2/clusters",
|
||||
timeout=REQUEST_TIMEOUT,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
def _tenant_exists(self):
|
||||
resp = requests.get(
|
||||
f"{self.pulsar_admin_url}/admin/v2/tenants/{self.tenant}",
|
||||
timeout=REQUEST_TIMEOUT,
|
||||
)
|
||||
return resp.status_code == 200
|
||||
|
||||
def _create_tenant(self, clusters):
|
||||
resp = requests.put(
|
||||
f"{self.pulsar_admin_url}/admin/v2/tenants/{self.tenant}",
|
||||
json={
|
||||
"adminRoles": [],
|
||||
"allowedClusters": clusters,
|
||||
},
|
||||
timeout=REQUEST_TIMEOUT,
|
||||
)
|
||||
if resp.status_code != 204:
|
||||
raise RuntimeError(
|
||||
f"Tenant {self.tenant} create failed: "
|
||||
f"{resp.status_code} {resp.text}"
|
||||
)
|
||||
|
||||
def _namespace_exists(self, namespace):
|
||||
resp = requests.get(
|
||||
f"{self.pulsar_admin_url}/admin/v2/namespaces/"
|
||||
f"{self.tenant}/{namespace}",
|
||||
timeout=REQUEST_TIMEOUT,
|
||||
)
|
||||
return resp.status_code == 200
|
||||
|
||||
def _create_namespace(self, namespace, config):
|
||||
resp = requests.put(
|
||||
f"{self.pulsar_admin_url}/admin/v2/namespaces/"
|
||||
f"{self.tenant}/{namespace}",
|
||||
json=config,
|
||||
timeout=REQUEST_TIMEOUT,
|
||||
)
|
||||
if resp.status_code != 204:
|
||||
raise RuntimeError(
|
||||
f"Namespace {self.tenant}/{namespace} create failed: "
|
||||
f"{resp.status_code} {resp.text}"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Reconcile step — offload blocking HTTP to the executor so we
|
||||
# don't stall the event loop.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _reconcile_sync(self):
|
||||
"""Do the actual admin-API work on a thread."""
|
||||
|
||||
if not self._tenant_exists():
|
||||
clusters = self._get_clusters()
|
||||
logger.info(
|
||||
f"Creating tenant {self.tenant} with clusters {clusters}"
|
||||
)
|
||||
self._create_tenant(clusters)
|
||||
else:
|
||||
logger.debug(f"Tenant {self.tenant} already exists.")
|
||||
|
||||
for namespace, config in NAMESPACE_CONFIG.items():
|
||||
if self._namespace_exists(namespace):
|
||||
logger.debug(
|
||||
f"Namespace {self.tenant}/{namespace} already exists."
|
||||
)
|
||||
continue
|
||||
logger.info(
|
||||
f"Creating namespace {self.tenant}/{namespace}"
|
||||
)
|
||||
self._create_namespace(namespace, config)
|
||||
|
||||
async def _reconcile_once(self):
|
||||
loop = asyncio.get_event_loop()
|
||||
await loop.run_in_executor(None, self._reconcile_sync)
|
||||
|
||||
async def run(self):
|
||||
|
||||
logger.info("Pulsar bootstrap loop starting.")
|
||||
|
||||
while self.running:
|
||||
|
||||
try:
|
||||
await self._reconcile_once()
|
||||
|
||||
except (requests.ConnectionError, requests.Timeout) as e:
|
||||
logger.info(
|
||||
f"Pulsar bootstrap: admin API unreachable "
|
||||
f"({type(e).__name__}: {e}); retry in "
|
||||
f"{CONNECT_BACKOFF}s"
|
||||
)
|
||||
await asyncio.sleep(CONNECT_BACKOFF)
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Pulsar bootstrap failed: "
|
||||
f"{type(e).__name__}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
# Treat as transient, long sleep to avoid hot loop.
|
||||
|
||||
await asyncio.sleep(CYCLE_INTERVAL)
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser: ArgumentParser) -> None:
|
||||
|
||||
AsyncProcessor.add_args(parser)
|
||||
|
||||
parser.add_argument(
|
||||
'--pulsar-admin-url',
|
||||
default=default_pulsar_admin_url,
|
||||
help=f'Pulsar admin REST URL '
|
||||
f'(default: {default_pulsar_admin_url})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-t', '--tenant',
|
||||
default=default_tenant,
|
||||
help=f'Pulsar tenant to create/maintain '
|
||||
f'(default: {default_tenant})',
|
||||
)
|
||||
|
||||
|
||||
def run():
|
||||
Processor.launch(default_ident, __doc__)
|
||||
Loading…
Add table
Add a link
Reference in a new issue