fix: IAM bootstrap atomicity and bootstrapper startup ordering (#935)

IAM auto-bootstrap could get permanently stuck in a half-done state:
_seed_tables wrote the workspace first, so any_workspace_exists()
returned true on restart even when user/key/signing-key creation had
failed.  Remove workspace creation from _seed_tables (WorkspaceInit
handles it) and use any_signing_key_exists() as the completion check
since the signing key is the last thing written.

Run pre-service initialisers (PulsarTopology) in start() before
opening pub/sub connections, breaking the chicken-and-egg where the
bootstrapper needed Pulsar namespaces that it was responsible for
creating.  Guard against empty cluster list when broker isn't ready.
This commit is contained in:
cybermaggedon 2026-05-18 22:08:12 +01:00 committed by GitHub
parent 76e3358ed3
commit 29d3100c46
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 74 additions and 35 deletions

View file

@ -326,6 +326,58 @@ class Processor(AsyncProcessor):
# Main loop. # Main loop.
# ------------------------------------------------------------------ # ------------------------------------------------------------------
async def _run_pre_service(self):
"""Run pre-service initialisers before opening pub/sub clients.
These bring up infrastructure that other services depend on
(e.g. Pulsar tenant/namespaces). They use out-of-band APIs
(HTTP admin), not pub/sub, so they don't need a config client.
They run without flag tracking they must be idempotent.
"""
pre_specs = [
s for s in self.specs
if not s.instance.wait_for_services
]
if not pre_specs:
return
for spec in pre_specs:
child_logger = logger.getChild(spec.name)
child_ctx = InitContext(
logger=child_logger,
config=None,
make_flow_client=self._make_flow_client,
make_iam_client=self._make_iam_client,
)
child_logger.info(f"Running pre-service initialiser")
try:
await spec.instance.run(child_ctx, None, spec.flag)
child_logger.info(f"Pre-service initialiser completed")
except Exception as e:
child_logger.error(
f"Pre-service initialiser failed: "
f"{type(e).__name__}: {e}",
exc_info=True,
)
raise
async def start(self):
# Run pre-service initialisers before opening any pub/sub
# connections. They bring up infrastructure (Pulsar
# namespaces, etc.) that super().start() depends on.
while self.running:
try:
await self._run_pre_service()
break
except Exception as e:
logger.info(
f"Pre-service initialisation failed "
f"({type(e).__name__}: {e}); retry in {GATE_BACKOFF}s"
)
await asyncio.sleep(GATE_BACKOFF)
await super().start()
async def run(self): async def run(self):
logger.info( logger.info(
@ -347,29 +399,18 @@ class Processor(AsyncProcessor):
continue continue
try: try:
# Phase 1: pre-service initialisers run unconditionally. # Phase 1: gate.
pre_specs = [
s for s in self.specs
if not s.instance.wait_for_services
]
pre_results = {}
for spec in pre_specs:
pre_results[spec.name] = await self._run_spec(
spec, config,
)
# Phase 2: gate.
gate_ok = await self._gate_ready(config) gate_ok = await self._gate_ready(config)
# Phase 3: post-service initialisers, if gate passed. # Phase 2: post-service initialisers, if gate passed.
post_results = {} results = {}
if gate_ok: if gate_ok:
post_specs = [ post_specs = [
s for s in self.specs s for s in self.specs
if s.instance.wait_for_services if s.instance.wait_for_services
] ]
for spec in post_specs: for spec in post_specs:
post_results[spec.name] = await self._run_spec( results[spec.name] = await self._run_spec(
spec, config, spec, config,
) )
@ -377,8 +418,7 @@ class Processor(AsyncProcessor):
if not gate_ok: if not gate_ok:
sleep_for = GATE_BACKOFF sleep_for = GATE_BACKOFF
else: else:
all_results = {**pre_results, **post_results} if any(r != "skip" for r in results.values()):
if any(r != "skip" for r in all_results.values()):
sleep_for = INIT_RETRY sleep_for = INIT_RETRY
else: else:
sleep_for = STEADY_INTERVAL sleep_for = STEADY_INTERVAL

View file

@ -112,6 +112,10 @@ class PulsarTopology(Initialiser):
def _reconcile_sync(self, logger): def _reconcile_sync(self, logger):
if not self._tenant_exists(): if not self._tenant_exists():
clusters = self._get_clusters() clusters = self._get_clusters()
if not clusters:
raise RuntimeError(
"Pulsar cluster list is empty — broker not ready yet"
)
logger.info( logger.info(
f"Creating tenant {self.tenant!r} with clusters {clusters}" f"Creating tenant {self.tenant!r} with clusters {clusters}"
) )

View file

@ -397,8 +397,8 @@ class IamService:
async def auto_bootstrap_if_token_mode(self): async def auto_bootstrap_if_token_mode(self):
"""Called from the service processor at startup. In """Called from the service processor at startup. In
``token`` mode, if tables are empty, seeds the default ``token`` mode, if tables are empty, seeds the admin user,
workspace / admin / signing key using the operator-provided API key, and signing key using the operator-provided
bootstrap token. The admin's API key plaintext is *the* bootstrap token. The admin's API key plaintext is *the*
``bootstrap_token`` the operator already knows it, nothing ``bootstrap_token`` the operator already knows it, nothing
needs to be returned or logged. needs to be returned or logged.
@ -408,7 +408,7 @@ class IamService:
if self.bootstrap_mode != "token": if self.bootstrap_mode != "token":
return return
if await self.table_store.any_workspace_exists(): if await self.table_store.any_signing_key_exists():
logger.info( logger.info(
"IAM: token mode, tables already populated; skipping " "IAM: token mode, tables already populated; skipping "
"auto-bootstrap" "auto-bootstrap"
@ -423,22 +423,13 @@ class IamService:
async def _seed_tables(self, api_key_plaintext): async def _seed_tables(self, api_key_plaintext):
"""Shared seeding logic used by token-mode auto-bootstrap and """Shared seeding logic used by token-mode auto-bootstrap and
bootstrap-mode handle_bootstrap. Creates the default bootstrap-mode handle_bootstrap. Creates the admin user,
workspace, admin user, admin API key (using the given admin API key (using the given plaintext), and an initial
plaintext), and an initial signing key. Returns the admin signing key. The workspace is created separately by the
bootstrapper's WorkspaceInit initialiser. Returns the admin
user id.""" user id."""
now = _now_dt() now = _now_dt()
await self.table_store.put_workspace(
id=DEFAULT_WORKSPACE,
name="Default",
enabled=True,
created=now,
)
if self._on_workspace_created:
await self._on_workspace_created(DEFAULT_WORKSPACE)
admin_user_id = str(uuid.uuid4()) admin_user_id = str(uuid.uuid4())
admin_password = secrets.token_urlsafe(32) admin_password = secrets.token_urlsafe(32)
await self.table_store.put_user( await self.table_store.put_user(
@ -491,7 +482,7 @@ class IamService:
if self.bootstrap_mode != "bootstrap": if self.bootstrap_mode != "bootstrap":
return _err("auth-failed", "auth failure") return _err("auth-failed", "auth failure")
if await self.table_store.any_workspace_exists(): if await self.table_store.any_signing_key_exists():
return _err("auth-failed", "auth failure") return _err("auth-failed", "auth failure")
plaintext = _generate_api_key() plaintext = _generate_api_key()
@ -531,7 +522,7 @@ class IamService:
instead of forcing callers to probe the masked-failure path.""" instead of forcing callers to probe the masked-failure path."""
available = ( available = (
self.bootstrap_mode == "bootstrap" self.bootstrap_mode == "bootstrap"
and not await self.table_store.any_workspace_exists() and not await self.table_store.any_signing_key_exists()
) )
return IamResponse(bootstrap_available=available) return IamResponse(bootstrap_available=available)

View file

@ -435,3 +435,7 @@ class IamTableStore:
async def any_workspace_exists(self): async def any_workspace_exists(self):
rows = await self.list_workspaces() rows = await self.list_workspaces()
return bool(rows) return bool(rows)
async def any_signing_key_exists(self):
rows = await self.list_signing_keys()
return bool(rows)