Fix initial flow

This commit is contained in:
Cyber MacGeddon 2026-05-01 16:45:34 +01:00
parent 115e325071
commit 3689e75ffa
3 changed files with 78 additions and 65 deletions

View file

@ -21,7 +21,7 @@ class InitContext:
logger: logging.Logger
config: Any # ConfigClient
flow: Any # RequestResponse client for flow-svc
make_flow_client: Any # callable(workspace) -> RequestResponse
class Initialiser:

View file

@ -178,13 +178,13 @@ class Processor(AsyncProcessor):
),
)
def _make_flow_client(self):
def _make_flow_client(self, workspace):
rr_id = str(uuid.uuid4())
return RequestResponse(
backend=self.pubsub_backend,
subscription=f"{self.id}--flow--{rr_id}",
consumer_name=self.id,
request_topic=flow_request_queue,
request_topic=f"{flow_request_queue}:{workspace}",
request_schema=FlowRequest,
request_metrics=ProducerMetrics(
processor=self.id, flow=None, name="flow-request",
@ -198,14 +198,8 @@ class Processor(AsyncProcessor):
async def _open_clients(self):
config = self._make_config_client()
flow = self._make_flow_client()
await config.start()
try:
await flow.start()
except Exception:
await self._safe_stop(config)
raise
return config, flow
return config
async def _safe_stop(self, client):
try:
@ -217,7 +211,14 @@ class Processor(AsyncProcessor):
# Service gate.
# ------------------------------------------------------------------
async def _gate_ready(self, config, flow):
def _gate_workspace(self):
for spec in self.specs:
ws = getattr(spec.instance, "workspace", None)
if ws and not ws.startswith("_"):
return ws
return None
async def _gate_ready(self, config):
try:
await config.keys(SYSTEM_WORKSPACE, INIT_STATE_TYPE)
except Exception as e:
@ -226,11 +227,16 @@ class Processor(AsyncProcessor):
)
return False
workspace = self._gate_workspace()
if workspace is None:
return True
flow = self._make_flow_client(workspace)
try:
await flow.start()
resp = await flow.request(
FlowRequest(
operation="list-blueprints",
workspace=SYSTEM_WORKSPACE,
),
timeout=5,
)
@ -245,6 +251,8 @@ class Processor(AsyncProcessor):
f"Gate: flow-svc not ready ({type(e).__name__}: {e})"
)
return False
finally:
await self._safe_stop(flow)
return True
@ -271,7 +279,7 @@ class Processor(AsyncProcessor):
# Per-spec execution.
# ------------------------------------------------------------------
async def _run_spec(self, spec, config, flow):
async def _run_spec(self, spec, config):
"""Run a single initialiser spec.
Returns one of:
@ -298,7 +306,7 @@ class Processor(AsyncProcessor):
child_ctx = InitContext(
logger=child_logger,
config=config,
flow=flow,
make_flow_client=self._make_flow_client,
)
child_logger.info(
@ -340,7 +348,7 @@ class Processor(AsyncProcessor):
sleep_for = STEADY_INTERVAL
try:
config, flow = await self._open_clients()
config = await self._open_clients()
except Exception as e:
logger.info(
f"Failed to open clients "
@ -358,11 +366,11 @@ class Processor(AsyncProcessor):
pre_results = {}
for spec in pre_specs:
pre_results[spec.name] = await self._run_spec(
spec, config, flow,
spec, config,
)
# Phase 2: gate.
gate_ok = await self._gate_ready(config, flow)
gate_ok = await self._gate_ready(config)
# Phase 3: post-service initialisers, if gate passed.
post_results = {}
@ -373,7 +381,7 @@ class Processor(AsyncProcessor):
]
for spec in post_specs:
post_results[spec.name] = await self._run_spec(
spec, config, flow,
spec, config,
)
# Cadence selection.
@ -388,7 +396,6 @@ class Processor(AsyncProcessor):
finally:
await self._safe_stop(config)
await self._safe_stop(flow)
await asyncio.sleep(sleep_for)

View file

@ -49,53 +49,59 @@ class DefaultFlowStart(Initialiser):
async def run(self, ctx, old_flag, new_flag):
# Check whether the flow already exists. Belt-and-braces
# beyond the flag gate: if an operator stops and restarts the
# bootstrapper after the flow is already running, we don't
# want to blindly try to start it again.
list_resp = await ctx.flow.request(
FlowRequest(
operation="list-flows",
workspace=self.workspace,
),
timeout=10,
)
if list_resp.error:
raise RuntimeError(
f"list-flows failed: "
f"{list_resp.error.type}: {list_resp.error.message}"
)
flow = ctx.make_flow_client(self.workspace)
await flow.start()
try:
# Check whether the flow already exists. Belt-and-braces
# beyond the flag gate: if an operator stops and restarts the
# bootstrapper after the flow is already running, we don't
# want to blindly try to start it again.
list_resp = await flow.request(
FlowRequest(
operation="list-flows",
),
timeout=10,
)
if list_resp.error:
raise RuntimeError(
f"list-flows failed: "
f"{list_resp.error.type}: {list_resp.error.message}"
)
if self.flow_id in (list_resp.flow_ids or []):
ctx.logger.info(
f"Flow {self.flow_id!r} already running in workspace "
f"{self.workspace!r}; nothing to do"
)
return
if self.flow_id in (list_resp.flow_ids or []):
ctx.logger.info(
f"Flow {self.flow_id!r} already running in workspace "
f"{self.workspace!r}; nothing to do"
)
return
ctx.logger.info(
f"Starting flow {self.flow_id!r} "
f"(blueprint={self.blueprint!r}) "
f"in workspace {self.workspace!r}"
)
resp = await ctx.flow.request(
FlowRequest(
operation="start-flow",
workspace=self.workspace,
flow_id=self.flow_id,
blueprint_name=self.blueprint,
description=self.description,
parameters=self.parameters,
),
timeout=30,
)
if resp.error:
raise RuntimeError(
f"start-flow failed: "
f"{resp.error.type}: {resp.error.message}"
f"Starting flow {self.flow_id!r} "
f"(blueprint={self.blueprint!r}) "
f"in workspace {self.workspace!r}"
)
ctx.logger.info(
f"Flow {self.flow_id!r} started"
)
resp = await flow.request(
FlowRequest(
operation="start-flow",
flow_id=self.flow_id,
blueprint_name=self.blueprint,
description=self.description,
parameters=self.parameters,
),
timeout=30,
)
if resp.error:
raise RuntimeError(
f"start-flow failed: "
f"{resp.error.type}: {resp.error.message}"
)
ctx.logger.info(
f"Flow {self.flow_id!r} started"
)
finally:
await flow.stop()