From 424ace44c436fd31b2dc8ecd48baf96f1f38aa3f Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Tue, 21 Apr 2026 21:30:19 +0100 Subject: [PATCH] Fix library queue lifecycle (#838) * Don't delete the global queues (librarian) when flows are deleted * 60s heartbeat timeouts on RabbitMQ --- .../trustgraph/base/rabbitmq_backend.py | 10 +- .../trustgraph/flow/service/flow.py | 135 +++++++++++++++++- 2 files changed, 140 insertions(+), 5 deletions(-) diff --git a/trustgraph-base/trustgraph/base/rabbitmq_backend.py b/trustgraph-base/trustgraph/base/rabbitmq_backend.py index 73b80cb9..a8133a44 100644 --- a/trustgraph-base/trustgraph/base/rabbitmq_backend.py +++ b/trustgraph-base/trustgraph/base/rabbitmq_backend.py @@ -326,7 +326,15 @@ class RabbitMQBackend: port=port, virtual_host=vhost, credentials=pika.PlainCredentials(username, password), - heartbeat=0, + # Heartbeats let us detect silently-dead connections + # (broker restarts, network partitions, orphaned channels) + # within ~2×interval. Consumer threads drive pika's I/O + # loop every 100ms via process_data_events() in receive(), + # so heartbeat frames get pumped automatically. Producers + # reconnect lazily on the next publish if their connection + # has been aged out by the broker. + heartbeat=60, + blocked_connection_timeout=300, ) logger.info(f"RabbitMQ backend: {host}:{port} vhost={vhost}") diff --git a/trustgraph-flow/trustgraph/flow/service/flow.py b/trustgraph-flow/trustgraph/flow/service/flow.py index b864faf9..a5e4a7e1 100644 --- a/trustgraph-flow/trustgraph/flow/service/flow.py +++ b/trustgraph-flow/trustgraph/flow/service/flow.py @@ -363,6 +363,112 @@ class FlowConfig: return topics + @staticmethod + def _topic_is_flow_owned(raw_template): + """Is a topic template owned by the flow system? + + A topic is flow-owned if its template contains at least one + variable substitution (``{id}``, ``{blueprint}``, + ``{workspace}``, ``{param}``, etc.). Pure literal templates + name topics that are created and owned by something else (a + global service, e.g. ``request:tg:librarian``) and must never + be touched by the flow service. + """ + return '{' in raw_template + + def _collect_owned_topics(self, cls, repl_template): + """Resolved set of flow-owned topics for a single flow. + + Only includes topics whose raw template was parameterised + (contains ``{...}``). Literal templates are skipped — they + refer to global topics the flow service does not own. + """ + topics = set() + + for k, v in cls["flow"].items(): + for spec_name, topic_template in v.get("topics", {}).items(): + if not self._topic_is_flow_owned(topic_template): + continue + topics.add(repl_template(topic_template)) + + return topics + + async def _live_owned_topic_closure(self, exclude_flow_id=None): + """Union of flow-owned topics referenced by all live flows. + + Walks every flow record currently registered in the config + service (except ``exclude_flow_id``, typically the flow being + torn down), resolves its blueprint + parameter templates, and + collects the set of flow-owned topics those templates produce. + + Used to drive closure-based topic cleanup on flow stop: a + topic may only be deleted if no remaining live flow would + still template to it. This handles all three scoping cases + transparently — ``{id}`` topics have no other references once + their flow is excluded; ``{blueprint}`` topics stay alive + while another flow of the same blueprint exists; ``{workspace}`` + (when introduced) stays alive while any flow in the workspace + exists. + """ + + live = set() + + flow_ids = await self.config.keys("flow") + + for fid in flow_ids: + + if fid == exclude_flow_id: + continue + + try: + frec_raw = await self.config.get("flow", fid) + if frec_raw is None: + continue + frec = json.loads(frec_raw) + except Exception as e: + logger.warning( + f"Closure sweep: skipping flow {fid}: {e}" + ) + continue + + # Flows mid-shutdown don't keep their topics alive. + if frec.get("status") == "stopping": + continue + + bp_name = frec.get("blueprint-name") + if bp_name is None: + continue + + try: + bp_raw = await self.config.get("flow-blueprint", bp_name) + if bp_raw is None: + continue + bp = json.loads(bp_raw) + except Exception as e: + logger.warning( + f"Closure sweep: skipping flow {fid} " + f"(blueprint {bp_name}): {e}" + ) + continue + + parameters = frec.get("parameters", {}) + + def repl(tmp, bp_name=bp_name, fid=fid, parameters=parameters): + result = tmp.replace( + "{blueprint}", bp_name + ).replace( + "{id}", fid + ) + for pname, pvalue in parameters.items(): + result = result.replace( + f"{{{pname}}}", str(pvalue) + ) + return result + + live.update(self._collect_owned_topics(bp, repl)) + + return live + async def _delete_topics(self, topics): """Delete topics with retries. Best-effort — logs failures but does not raise.""" @@ -424,8 +530,10 @@ class FlowConfig: result = result.replace(f"{{{param_name}}}", str(param_value)) return result - # Collect topic identifiers before removing config - topics = self._collect_flow_topics(cls, repl_template) + # Collect this flow's owned topics before any config changes. + # Global (literal-template) topics are never touched — they are + # managed by whichever service owns them, not by flow-svc. + this_flow_owned = self._collect_owned_topics(cls, repl_template) # Phase 1: Set status to "stopping" and remove processor config. # The config push tells processors to shut down their consumers. @@ -446,9 +554,28 @@ class FlowConfig: await self.config.delete_many(deletes) - # Phase 2: Delete topics with retries, then remove the flow record. - await self._delete_topics(topics) + # Phase 2: Closure-based sweep. Only delete topics that no + # other live flow still references via its blueprint templates. + # This preserves {blueprint}-scoped topics while another flow + # of the same blueprint is still running, and {workspace}-scoped + # topics while any flow in that workspace remains. + live_owned = await self._live_owned_topic_closure( + exclude_flow_id=msg.flow_id, + ) + to_delete = this_flow_owned - live_owned + + if to_delete: + await self._delete_topics(to_delete) + + kept = this_flow_owned - to_delete + if kept: + logger.info( + f"Flow {msg.flow_id}: keeping {len(kept)} topics " + f"still referenced by other live flows" + ) + + # Phase 3: Remove the flow record. if msg.flow_id in await self.config.keys("flow"): await self.config.delete("flow", msg.flow_id)