diff --git a/trustgraph-flow/trustgraph/bootstrap/base.py b/trustgraph-flow/trustgraph/bootstrap/base.py index cb022a16..108e441c 100644 --- a/trustgraph-flow/trustgraph/bootstrap/base.py +++ b/trustgraph-flow/trustgraph/bootstrap/base.py @@ -21,7 +21,7 @@ class InitContext: logger: logging.Logger config: Any # ConfigClient - flow: Any # RequestResponse client for flow-svc + make_flow_client: Any # callable(workspace) -> RequestResponse class Initialiser: diff --git a/trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py b/trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py index eb6238d3..7b63a1af 100644 --- a/trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py +++ b/trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py @@ -178,13 +178,13 @@ class Processor(AsyncProcessor): ), ) - def _make_flow_client(self): + def _make_flow_client(self, workspace): rr_id = str(uuid.uuid4()) return RequestResponse( backend=self.pubsub_backend, subscription=f"{self.id}--flow--{rr_id}", consumer_name=self.id, - request_topic=flow_request_queue, + request_topic=f"{flow_request_queue}:{workspace}", request_schema=FlowRequest, request_metrics=ProducerMetrics( processor=self.id, flow=None, name="flow-request", @@ -198,14 +198,8 @@ class Processor(AsyncProcessor): async def _open_clients(self): config = self._make_config_client() - flow = self._make_flow_client() await config.start() - try: - await flow.start() - except Exception: - await self._safe_stop(config) - raise - return config, flow + return config async def _safe_stop(self, client): try: @@ -217,7 +211,14 @@ class Processor(AsyncProcessor): # Service gate. # ------------------------------------------------------------------ - async def _gate_ready(self, config, flow): + def _gate_workspace(self): + for spec in self.specs: + ws = getattr(spec.instance, "workspace", None) + if ws and not ws.startswith("_"): + return ws + return None + + async def _gate_ready(self, config): try: await config.keys(SYSTEM_WORKSPACE, INIT_STATE_TYPE) except Exception as e: @@ -226,11 +227,16 @@ class Processor(AsyncProcessor): ) return False + workspace = self._gate_workspace() + if workspace is None: + return True + + flow = self._make_flow_client(workspace) try: + await flow.start() resp = await flow.request( FlowRequest( operation="list-blueprints", - workspace=SYSTEM_WORKSPACE, ), timeout=5, ) @@ -245,6 +251,8 @@ class Processor(AsyncProcessor): f"Gate: flow-svc not ready ({type(e).__name__}: {e})" ) return False + finally: + await self._safe_stop(flow) return True @@ -271,7 +279,7 @@ class Processor(AsyncProcessor): # Per-spec execution. # ------------------------------------------------------------------ - async def _run_spec(self, spec, config, flow): + async def _run_spec(self, spec, config): """Run a single initialiser spec. Returns one of: @@ -298,7 +306,7 @@ class Processor(AsyncProcessor): child_ctx = InitContext( logger=child_logger, config=config, - flow=flow, + make_flow_client=self._make_flow_client, ) child_logger.info( @@ -340,7 +348,7 @@ class Processor(AsyncProcessor): sleep_for = STEADY_INTERVAL try: - config, flow = await self._open_clients() + config = await self._open_clients() except Exception as e: logger.info( f"Failed to open clients " @@ -358,11 +366,11 @@ class Processor(AsyncProcessor): pre_results = {} for spec in pre_specs: pre_results[spec.name] = await self._run_spec( - spec, config, flow, + spec, config, ) # Phase 2: gate. - gate_ok = await self._gate_ready(config, flow) + gate_ok = await self._gate_ready(config) # Phase 3: post-service initialisers, if gate passed. post_results = {} @@ -373,7 +381,7 @@ class Processor(AsyncProcessor): ] for spec in post_specs: post_results[spec.name] = await self._run_spec( - spec, config, flow, + spec, config, ) # Cadence selection. @@ -388,7 +396,6 @@ class Processor(AsyncProcessor): finally: await self._safe_stop(config) - await self._safe_stop(flow) await asyncio.sleep(sleep_for) diff --git a/trustgraph-flow/trustgraph/bootstrap/initialisers/default_flow_start.py b/trustgraph-flow/trustgraph/bootstrap/initialisers/default_flow_start.py index 7e7f96bd..cf5f3dd9 100644 --- a/trustgraph-flow/trustgraph/bootstrap/initialisers/default_flow_start.py +++ b/trustgraph-flow/trustgraph/bootstrap/initialisers/default_flow_start.py @@ -49,53 +49,59 @@ class DefaultFlowStart(Initialiser): async def run(self, ctx, old_flag, new_flag): - # Check whether the flow already exists. Belt-and-braces - # beyond the flag gate: if an operator stops and restarts the - # bootstrapper after the flow is already running, we don't - # want to blindly try to start it again. - list_resp = await ctx.flow.request( - FlowRequest( - operation="list-flows", - workspace=self.workspace, - ), - timeout=10, - ) - if list_resp.error: - raise RuntimeError( - f"list-flows failed: " - f"{list_resp.error.type}: {list_resp.error.message}" - ) + flow = ctx.make_flow_client(self.workspace) + await flow.start() + + try: + + # Check whether the flow already exists. Belt-and-braces + # beyond the flag gate: if an operator stops and restarts the + # bootstrapper after the flow is already running, we don't + # want to blindly try to start it again. + list_resp = await flow.request( + FlowRequest( + operation="list-flows", + ), + timeout=10, + ) + if list_resp.error: + raise RuntimeError( + f"list-flows failed: " + f"{list_resp.error.type}: {list_resp.error.message}" + ) + + if self.flow_id in (list_resp.flow_ids or []): + ctx.logger.info( + f"Flow {self.flow_id!r} already running in workspace " + f"{self.workspace!r}; nothing to do" + ) + return - if self.flow_id in (list_resp.flow_ids or []): ctx.logger.info( - f"Flow {self.flow_id!r} already running in workspace " - f"{self.workspace!r}; nothing to do" - ) - return - - ctx.logger.info( - f"Starting flow {self.flow_id!r} " - f"(blueprint={self.blueprint!r}) " - f"in workspace {self.workspace!r}" - ) - - resp = await ctx.flow.request( - FlowRequest( - operation="start-flow", - workspace=self.workspace, - flow_id=self.flow_id, - blueprint_name=self.blueprint, - description=self.description, - parameters=self.parameters, - ), - timeout=30, - ) - if resp.error: - raise RuntimeError( - f"start-flow failed: " - f"{resp.error.type}: {resp.error.message}" + f"Starting flow {self.flow_id!r} " + f"(blueprint={self.blueprint!r}) " + f"in workspace {self.workspace!r}" ) - ctx.logger.info( - f"Flow {self.flow_id!r} started" - ) + resp = await flow.request( + FlowRequest( + operation="start-flow", + flow_id=self.flow_id, + blueprint_name=self.blueprint, + description=self.description, + parameters=self.parameters, + ), + timeout=30, + ) + if resp.error: + raise RuntimeError( + f"start-flow failed: " + f"{resp.error.type}: {resp.error.message}" + ) + + ctx.logger.info( + f"Flow {self.flow_id!r} started" + ) + + finally: + await flow.stop()