diff --git a/trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py b/trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py index 3c658fe3..81b7e98d 100644 --- a/trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py +++ b/trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py @@ -326,6 +326,58 @@ class Processor(AsyncProcessor): # Main loop. # ------------------------------------------------------------------ + async def _run_pre_service(self): + """Run pre-service initialisers before opening pub/sub clients. + + These bring up infrastructure that other services depend on + (e.g. Pulsar tenant/namespaces). They use out-of-band APIs + (HTTP admin), not pub/sub, so they don't need a config client. + They run without flag tracking — they must be idempotent. + """ + pre_specs = [ + s for s in self.specs + if not s.instance.wait_for_services + ] + if not pre_specs: + return + + for spec in pre_specs: + child_logger = logger.getChild(spec.name) + child_ctx = InitContext( + logger=child_logger, + config=None, + make_flow_client=self._make_flow_client, + make_iam_client=self._make_iam_client, + ) + child_logger.info(f"Running pre-service initialiser") + try: + await spec.instance.run(child_ctx, None, spec.flag) + child_logger.info(f"Pre-service initialiser completed") + except Exception as e: + child_logger.error( + f"Pre-service initialiser failed: " + f"{type(e).__name__}: {e}", + exc_info=True, + ) + raise + + async def start(self): + # Run pre-service initialisers before opening any pub/sub + # connections. They bring up infrastructure (Pulsar + # namespaces, etc.) that super().start() depends on. + while self.running: + try: + await self._run_pre_service() + break + except Exception as e: + logger.info( + f"Pre-service initialisation failed " + f"({type(e).__name__}: {e}); retry in {GATE_BACKOFF}s" + ) + await asyncio.sleep(GATE_BACKOFF) + + await super().start() + async def run(self): logger.info( @@ -347,29 +399,18 @@ class Processor(AsyncProcessor): continue try: - # Phase 1: pre-service initialisers run unconditionally. - pre_specs = [ - s for s in self.specs - if not s.instance.wait_for_services - ] - pre_results = {} - for spec in pre_specs: - pre_results[spec.name] = await self._run_spec( - spec, config, - ) - - # Phase 2: gate. + # Phase 1: gate. gate_ok = await self._gate_ready(config) - # Phase 3: post-service initialisers, if gate passed. - post_results = {} + # Phase 2: post-service initialisers, if gate passed. + results = {} if gate_ok: post_specs = [ s for s in self.specs if s.instance.wait_for_services ] for spec in post_specs: - post_results[spec.name] = await self._run_spec( + results[spec.name] = await self._run_spec( spec, config, ) @@ -377,8 +418,7 @@ class Processor(AsyncProcessor): if not gate_ok: sleep_for = GATE_BACKOFF else: - all_results = {**pre_results, **post_results} - if any(r != "skip" for r in all_results.values()): + if any(r != "skip" for r in results.values()): sleep_for = INIT_RETRY else: sleep_for = STEADY_INTERVAL diff --git a/trustgraph-flow/trustgraph/bootstrap/initialisers/pulsar_topology.py b/trustgraph-flow/trustgraph/bootstrap/initialisers/pulsar_topology.py index 843fe056..1e4805de 100644 --- a/trustgraph-flow/trustgraph/bootstrap/initialisers/pulsar_topology.py +++ b/trustgraph-flow/trustgraph/bootstrap/initialisers/pulsar_topology.py @@ -112,6 +112,10 @@ class PulsarTopology(Initialiser): def _reconcile_sync(self, logger): if not self._tenant_exists(): clusters = self._get_clusters() + if not clusters: + raise RuntimeError( + "Pulsar cluster list is empty — broker not ready yet" + ) logger.info( f"Creating tenant {self.tenant!r} with clusters {clusters}" ) diff --git a/trustgraph-flow/trustgraph/iam/service/iam.py b/trustgraph-flow/trustgraph/iam/service/iam.py index 7beaf5ed..0335012e 100644 --- a/trustgraph-flow/trustgraph/iam/service/iam.py +++ b/trustgraph-flow/trustgraph/iam/service/iam.py @@ -397,8 +397,8 @@ class IamService: async def auto_bootstrap_if_token_mode(self): """Called from the service processor at startup. In - ``token`` mode, if tables are empty, seeds the default - workspace / admin / signing key using the operator-provided + ``token`` mode, if tables are empty, seeds the admin user, + API key, and signing key using the operator-provided bootstrap token. The admin's API key plaintext is *the* ``bootstrap_token`` — the operator already knows it, nothing needs to be returned or logged. @@ -408,7 +408,7 @@ class IamService: if self.bootstrap_mode != "token": return - if await self.table_store.any_workspace_exists(): + if await self.table_store.any_signing_key_exists(): logger.info( "IAM: token mode, tables already populated; skipping " "auto-bootstrap" @@ -423,22 +423,13 @@ class IamService: async def _seed_tables(self, api_key_plaintext): """Shared seeding logic used by token-mode auto-bootstrap and - bootstrap-mode handle_bootstrap. Creates the default - workspace, admin user, admin API key (using the given - plaintext), and an initial signing key. Returns the admin + bootstrap-mode handle_bootstrap. Creates the admin user, + admin API key (using the given plaintext), and an initial + signing key. The workspace is created separately by the + bootstrapper's WorkspaceInit initialiser. Returns the admin user id.""" now = _now_dt() - await self.table_store.put_workspace( - id=DEFAULT_WORKSPACE, - name="Default", - enabled=True, - created=now, - ) - - if self._on_workspace_created: - await self._on_workspace_created(DEFAULT_WORKSPACE) - admin_user_id = str(uuid.uuid4()) admin_password = secrets.token_urlsafe(32) await self.table_store.put_user( @@ -491,7 +482,7 @@ class IamService: if self.bootstrap_mode != "bootstrap": return _err("auth-failed", "auth failure") - if await self.table_store.any_workspace_exists(): + if await self.table_store.any_signing_key_exists(): return _err("auth-failed", "auth failure") plaintext = _generate_api_key() @@ -531,7 +522,7 @@ class IamService: instead of forcing callers to probe the masked-failure path.""" available = ( self.bootstrap_mode == "bootstrap" - and not await self.table_store.any_workspace_exists() + and not await self.table_store.any_signing_key_exists() ) return IamResponse(bootstrap_available=available) diff --git a/trustgraph-flow/trustgraph/tables/iam.py b/trustgraph-flow/trustgraph/tables/iam.py index 8bf9c8b4..d7bf5e3d 100644 --- a/trustgraph-flow/trustgraph/tables/iam.py +++ b/trustgraph-flow/trustgraph/tables/iam.py @@ -435,3 +435,7 @@ class IamTableStore: async def any_workspace_exists(self): rows = await self.list_workspaces() return bool(rows) + + async def any_signing_key_exists(self): + rows = await self.list_signing_keys() + return bool(rows)