Fix library queue lifecycle (#838)

* Don't delete the global queues (librarian) when flows are deleted

* 60s heartbeat timeouts on RabbitMQ
This commit is contained in:
cybermaggedon 2026-04-21 21:30:19 +01:00 committed by GitHub
parent 0ef49ab6ae
commit 424ace44c4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 140 additions and 5 deletions

View file

@ -326,7 +326,15 @@ class RabbitMQBackend:
port=port,
virtual_host=vhost,
credentials=pika.PlainCredentials(username, password),
heartbeat=0,
# Heartbeats let us detect silently-dead connections
# (broker restarts, network partitions, orphaned channels)
# within ~2×interval. Consumer threads drive pika's I/O
# loop every 100ms via process_data_events() in receive(),
# so heartbeat frames get pumped automatically. Producers
# reconnect lazily on the next publish if their connection
# has been aged out by the broker.
heartbeat=60,
blocked_connection_timeout=300,
)
logger.info(f"RabbitMQ backend: {host}:{port} vhost={vhost}")

View file

@ -363,6 +363,112 @@ class FlowConfig:
return topics
@staticmethod
def _topic_is_flow_owned(raw_template):
"""Is a topic template owned by the flow system?
A topic is flow-owned if its template contains at least one
variable substitution (``{id}``, ``{blueprint}``,
``{workspace}``, ``{param}``, etc.). Pure literal templates
name topics that are created and owned by something else (a
global service, e.g. ``request:tg:librarian``) and must never
be touched by the flow service.
"""
return '{' in raw_template
def _collect_owned_topics(self, cls, repl_template):
"""Resolved set of flow-owned topics for a single flow.
Only includes topics whose raw template was parameterised
(contains ``{...}``). Literal templates are skipped they
refer to global topics the flow service does not own.
"""
topics = set()
for k, v in cls["flow"].items():
for spec_name, topic_template in v.get("topics", {}).items():
if not self._topic_is_flow_owned(topic_template):
continue
topics.add(repl_template(topic_template))
return topics
async def _live_owned_topic_closure(self, exclude_flow_id=None):
"""Union of flow-owned topics referenced by all live flows.
Walks every flow record currently registered in the config
service (except ``exclude_flow_id``, typically the flow being
torn down), resolves its blueprint + parameter templates, and
collects the set of flow-owned topics those templates produce.
Used to drive closure-based topic cleanup on flow stop: a
topic may only be deleted if no remaining live flow would
still template to it. This handles all three scoping cases
transparently ``{id}`` topics have no other references once
their flow is excluded; ``{blueprint}`` topics stay alive
while another flow of the same blueprint exists; ``{workspace}``
(when introduced) stays alive while any flow in the workspace
exists.
"""
live = set()
flow_ids = await self.config.keys("flow")
for fid in flow_ids:
if fid == exclude_flow_id:
continue
try:
frec_raw = await self.config.get("flow", fid)
if frec_raw is None:
continue
frec = json.loads(frec_raw)
except Exception as e:
logger.warning(
f"Closure sweep: skipping flow {fid}: {e}"
)
continue
# Flows mid-shutdown don't keep their topics alive.
if frec.get("status") == "stopping":
continue
bp_name = frec.get("blueprint-name")
if bp_name is None:
continue
try:
bp_raw = await self.config.get("flow-blueprint", bp_name)
if bp_raw is None:
continue
bp = json.loads(bp_raw)
except Exception as e:
logger.warning(
f"Closure sweep: skipping flow {fid} "
f"(blueprint {bp_name}): {e}"
)
continue
parameters = frec.get("parameters", {})
def repl(tmp, bp_name=bp_name, fid=fid, parameters=parameters):
result = tmp.replace(
"{blueprint}", bp_name
).replace(
"{id}", fid
)
for pname, pvalue in parameters.items():
result = result.replace(
f"{{{pname}}}", str(pvalue)
)
return result
live.update(self._collect_owned_topics(bp, repl))
return live
async def _delete_topics(self, topics):
"""Delete topics with retries. Best-effort — logs failures but
does not raise."""
@ -424,8 +530,10 @@ class FlowConfig:
result = result.replace(f"{{{param_name}}}", str(param_value))
return result
# Collect topic identifiers before removing config
topics = self._collect_flow_topics(cls, repl_template)
# Collect this flow's owned topics before any config changes.
# Global (literal-template) topics are never touched — they are
# managed by whichever service owns them, not by flow-svc.
this_flow_owned = self._collect_owned_topics(cls, repl_template)
# Phase 1: Set status to "stopping" and remove processor config.
# The config push tells processors to shut down their consumers.
@ -446,9 +554,28 @@ class FlowConfig:
await self.config.delete_many(deletes)
# Phase 2: Delete topics with retries, then remove the flow record.
await self._delete_topics(topics)
# Phase 2: Closure-based sweep. Only delete topics that no
# other live flow still references via its blueprint templates.
# This preserves {blueprint}-scoped topics while another flow
# of the same blueprint is still running, and {workspace}-scoped
# topics while any flow in that workspace remains.
live_owned = await self._live_owned_topic_closure(
exclude_flow_id=msg.flow_id,
)
to_delete = this_flow_owned - live_owned
if to_delete:
await self._delete_topics(to_delete)
kept = this_flow_owned - to_delete
if kept:
logger.info(
f"Flow {msg.flow_id}: keeping {len(kept)} topics "
f"still referenced by other live flows"
)
# Phase 3: Remove the flow record.
if msg.flow_id in await self.config.keys("flow"):
await self.config.delete("flow", msg.flow_id)