Flow service lifecycle management (#822)

feat: separate flow service from config service with explicit queue
lifecycle management

The flow service is now an independent service that owns the lifecycle
of flow and blueprint queues. System services own their own queues.
Consumers never create queues.

Flow service separation:
- New service at trustgraph-flow/trustgraph/flow/service/
- Uses async ConfigClient (RequestResponse pattern) to talk to config
  service
- Config service stripped of all flow handling

Queue lifecycle management:
- PubSubBackend protocol gains create_queue, delete_queue,
  queue_exists, ensure_queue — all async
- RabbitMQ: implements via pika with asyncio.to_thread internally
- Pulsar: stubs for future admin REST API implementation
- Consumer _connect() no longer creates queues (passive=True for named
  queues)
- System services call ensure_queue on startup
- Flow service creates queues on flow start, deletes on flow stop
- Flow service ensures queues for pre-existing flows on startup

Two-phase flow stop:
- Phase 1: set flow status to "stopping", delete processor config
  entries
- Phase 2: retry queue deletion, then delete flow record

Config restructure:
- active-flow config replaced with processor:{name} types
- Each processor has its own config type, each flow variant is a key
- Flow start/stop use batch put/delete — single config push per
  operation
- FlowProcessor subscribes to its own type only

Blueprint format:
- Processor entries split into topics and parameters dicts
- Flow interfaces use {"flow": "topic"} instead of bare strings
- Specs (ConsumerSpec, ProducerSpec, etc.) read from
  definition["topics"]

Tests updated
This commit is contained in:
cybermaggedon 2026-04-16 17:19:39 +01:00 committed by GitHub
parent 645b6a66fd
commit 9f84891fcc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 1202 additions and 398 deletions

View file

@ -61,6 +61,7 @@ api-gateway = "trustgraph.gateway:run"
chunker-recursive = "trustgraph.chunking.recursive:run"
chunker-token = "trustgraph.chunking.token:run"
config-svc = "trustgraph.config.service:run"
flow-svc = "trustgraph.flow.service:run"
doc-embeddings-query-milvus = "trustgraph.query.doc_embeddings.milvus:run"
doc-embeddings-query-pinecone = "trustgraph.query.doc_embeddings.pinecone:run"
doc-embeddings-query-qdrant = "trustgraph.query.doc_embeddings.qdrant:run"

View file

@ -11,14 +11,10 @@ from trustgraph.schema import ConfigRequest, ConfigResponse, ConfigPush
from trustgraph.schema import config_request_queue, config_response_queue
from trustgraph.schema import config_push_queue
from trustgraph.schema import FlowRequest, FlowResponse
from trustgraph.schema import flow_request_queue, flow_response_queue
from trustgraph.base import AsyncProcessor, Consumer, Producer
from trustgraph.base.cassandra_config import add_cassandra_args, resolve_cassandra_config
from . config import Configuration
from . flow import FlowConfig
from ... base import ProcessorMetrics, ConsumerMetrics, ProducerMetrics
from ... base import Consumer, Producer
@ -32,9 +28,6 @@ default_config_request_queue = config_request_queue
default_config_response_queue = config_response_queue
default_config_push_queue = config_push_queue
default_flow_request_queue = flow_request_queue
default_flow_response_queue = flow_response_queue
default_cassandra_host = "cassandra"
class Processor(AsyncProcessor):
@ -51,13 +44,6 @@ class Processor(AsyncProcessor):
"config_push_queue", default_config_push_queue
)
flow_request_queue = params.get(
"flow_request_queue", default_flow_request_queue
)
flow_response_queue = params.get(
"flow_response_queue", default_flow_response_queue
)
cassandra_host = params.get("cassandra_host")
cassandra_username = params.get("cassandra_username")
cassandra_password = params.get("cassandra_password")
@ -77,16 +63,11 @@ class Processor(AsyncProcessor):
id = params.get("id")
flow_request_schema = FlowRequest
flow_response_schema = FlowResponse
super(Processor, self).__init__(
**params | {
"config_request_schema": ConfigRequest.__name__,
"config_response_schema": ConfigResponse.__name__,
"config_push_schema": ConfigPush.__name__,
"flow_request_schema": FlowRequest.__name__,
"flow_response_schema": FlowResponse.__name__,
"cassandra_host": self.cassandra_host,
"cassandra_username": self.cassandra_username,
"cassandra_password": self.cassandra_password,
@ -103,12 +84,8 @@ class Processor(AsyncProcessor):
processor = self.id, flow = None, name = "config-push"
)
flow_request_metrics = ConsumerMetrics(
processor = self.id, flow = None, name = "flow-request"
)
flow_response_metrics = ProducerMetrics(
processor = self.id, flow = None, name = "flow-response"
)
self.config_request_topic = config_request_queue
self.config_request_subscriber = id
self.config_request_consumer = Consumer(
taskgroup = self.taskgroup,
@ -135,24 +112,6 @@ class Processor(AsyncProcessor):
metrics = config_push_metrics,
)
self.flow_request_consumer = Consumer(
taskgroup = self.taskgroup,
backend = self.pubsub,
flow = None,
topic = flow_request_queue,
subscriber = id,
schema = FlowRequest,
handler = self.on_flow_request,
metrics = flow_request_metrics,
)
self.flow_response_producer = Producer(
backend = self.pubsub,
topic = flow_response_queue,
schema = FlowResponse,
metrics = flow_response_metrics,
)
self.config = Configuration(
host = self.cassandra_host,
username = self.cassandra_username,
@ -161,15 +120,15 @@ class Processor(AsyncProcessor):
push = self.push
)
self.flow = FlowConfig(self.config)
logger.info("Config service initialized")
async def start(self):
await self.pubsub.ensure_queue(
self.config_request_topic, self.config_request_subscriber
)
await self.push() # Startup poke: empty types = everything
await self.config_request_consumer.start()
await self.flow_request_consumer.start()
async def push(self, types=None):
@ -193,7 +152,7 @@ class Processor(AsyncProcessor):
# Sender-produced ID
id = msg.properties()["id"]
logger.info(f"Handling config request {id}...")
logger.debug(f"Handling config request {id}...")
resp = await self.config.handle(v)
@ -214,36 +173,6 @@ class Processor(AsyncProcessor):
resp, properties={"id": id}
)
async def on_flow_request(self, msg, consumer, flow):
try:
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
logger.info(f"Handling flow request {id}...")
resp = await self.flow.handle(v)
await self.flow_response_producer.send(
resp, properties={"id": id}
)
except Exception as e:
resp = FlowResponse(
error=Error(
type = "flow-error",
message = str(e),
),
)
await self.flow_response_producer.send(
resp, properties={"id": id}
)
@staticmethod
def add_args(parser):
@ -263,18 +192,6 @@ class Processor(AsyncProcessor):
# Note: --config-push-queue is already added by AsyncProcessor.add_args()
parser.add_argument(
'--flow-request-queue',
default=default_flow_request_queue,
help=f'Flow request queue (default: {default_flow_request_queue})'
)
parser.add_argument(
'--flow-response-queue',
default=default_flow_response_queue,
help=f'Flow response queue {default_flow_response_queue}',
)
add_cassandra_args(parser)
def run():

View file

@ -192,8 +192,8 @@ class KnowledgeManager:
if "graph-embeddings-store" not in flow["interfaces"]:
raise RuntimeError("Flow has no graph-embeddings-store")
t_q = flow["interfaces"]["triples-store"]
ge_q = flow["interfaces"]["graph-embeddings-store"]
t_q = flow["interfaces"]["triples-store"]["flow"]
ge_q = flow["interfaces"]["graph-embeddings-store"]["flow"]
# Got this far, it should all work
await respond(

View file

@ -82,6 +82,9 @@ class Processor(AsyncProcessor):
processor = self.id, flow = None, name = "knowledge-response"
)
self.knowledge_request_topic = knowledge_request_queue
self.knowledge_request_subscriber = id
self.knowledge_request_consumer = Consumer(
taskgroup = self.taskgroup,
backend = self.pubsub,
@ -116,6 +119,9 @@ class Processor(AsyncProcessor):
async def start(self):
await self.pubsub.ensure_queue(
self.knowledge_request_topic, self.knowledge_request_subscriber
)
await super(Processor, self).start()
await self.knowledge_request_consumer.start()
await self.knowledge_response_producer.start()

View file

@ -0,0 +1,2 @@
from . service import *

View file

@ -0,0 +1,2 @@
from . service import *

View file

@ -0,0 +1,6 @@
#!/usr/bin/env python3
from . service import run
if __name__ == '__main__':
run()

View file

@ -1,15 +1,22 @@
from trustgraph.schema import FlowResponse, Error
import asyncio
import json
import logging
# Module logger
logger = logging.getLogger(__name__)
# Queue deletion retry settings
DELETE_RETRIES = 5
DELETE_RETRY_DELAY = 2 # seconds
class FlowConfig:
def __init__(self, config):
def __init__(self, config, pubsub):
self.config = config
self.pubsub = pubsub
# Cache for parameter type definitions to avoid repeated lookups
self.param_type_cache = {}
@ -22,9 +29,12 @@ class FlowConfig:
user_params: User-provided parameters dict (may be None or empty)
Returns:
Complete parameter dict with user values and defaults merged (all values as strings)
Complete parameter dict with user values and defaults merged
(all values as strings)
"""
# If the flow blueprint has no parameters section, return user params as-is (stringified)
if "parameters" not in flow_blueprint:
if not user_params:
return {}
@ -49,7 +59,9 @@ class FlowConfig:
if param_type not in self.param_type_cache:
try:
# Fetch parameter type definition from config store
type_def = await self.config.get("parameter-type").get(param_type)
type_def = await self.config.get(
"parameter-type", param_type
)
if type_def:
self.param_type_cache[param_type] = json.loads(type_def)
else:
@ -102,32 +114,29 @@ class FlowConfig:
async def handle_list_blueprints(self, msg):
names = list(await self.config.get("flow-blueprint").keys())
names = list(await self.config.keys("flow-blueprint"))
return FlowResponse(
error = None,
blueprint_names = names,
)
async def handle_get_blueprint(self, msg):
return FlowResponse(
error = None,
blueprint_definition = await self.config.get(
"flow-blueprint"
).get(msg.blueprint_name),
"flow-blueprint", msg.blueprint_name
),
)
async def handle_put_blueprint(self, msg):
await self.config.get("flow-blueprint").put(
await self.config.put(
"flow-blueprint",
msg.blueprint_name, msg.blueprint_definition
)
await self.config.inc_version()
await self.config.push(types=["flow-blueprint"])
return FlowResponse(
error = None,
)
@ -136,28 +145,24 @@ class FlowConfig:
logger.debug(f"Flow config message: {msg}")
await self.config.get("flow-blueprint").delete(msg.blueprint_name)
await self.config.inc_version()
await self.config.push(types=["flow-blueprint"])
await self.config.delete("flow-blueprint", msg.blueprint_name)
return FlowResponse(
error = None,
)
async def handle_list_flows(self, msg):
names = list(await self.config.get("flow").keys())
names = list(await self.config.keys("flow"))
return FlowResponse(
error = None,
flow_ids = names,
)
async def handle_get_flow(self, msg):
flow_data = await self.config.get("flow").get(msg.flow_id)
flow_data = await self.config.get("flow", msg.flow_id)
flow = json.loads(flow_data)
return FlowResponse(
@ -166,7 +171,7 @@ class FlowConfig:
description = flow.get("description", ""),
parameters = flow.get("parameters", {}),
)
async def handle_start_flow(self, msg):
if msg.blueprint_name is None:
@ -175,17 +180,17 @@ class FlowConfig:
if msg.flow_id is None:
raise RuntimeError("No flow ID")
if msg.flow_id in await self.config.get("flow").keys():
if msg.flow_id in await self.config.keys("flow"):
raise RuntimeError("Flow already exists")
if msg.description is None:
raise RuntimeError("No description")
if msg.blueprint_name not in await self.config.get("flow-blueprint").keys():
if msg.blueprint_name not in await self.config.keys("flow-blueprint"):
raise RuntimeError("Blueprint does not exist")
cls = json.loads(
await self.config.get("flow-blueprint").get(msg.blueprint_name)
await self.config.get("flow-blueprint", msg.blueprint_name)
)
# Resolve parameters by merging user-provided values with defaults
@ -210,6 +215,15 @@ class FlowConfig:
return result
# Pre-create flow-level queues so the data path is wired
# before processors receive their config and start connecting.
queues = self._collect_flow_queues(cls, repl_template_with_params)
for topic, subscription in queues:
await self.pubsub.create_queue(topic, subscription)
# Build all processor config updates, then write in a single batch.
updates = []
for kind in ("blueprint", "flow"):
for k, v in cls[kind].items():
@ -218,37 +232,34 @@ class FlowConfig:
variant = repl_template_with_params(variant)
v = {
topics = {
repl_template_with_params(k2): repl_template_with_params(v2)
for k2, v2 in v.items()
for k2, v2 in v.get("topics", {}).items()
}
flac = await self.config.get("active-flow").get(processor)
if flac is not None:
target = json.loads(flac)
else:
target = {}
params = {
repl_template_with_params(k2): repl_template_with_params(v2)
for k2, v2 in v.get("parameters", {}).items()
}
# The condition if variant not in target: means it only adds
# the configuration if the variant doesn't already exist.
# If "everything" already exists in the target with old
# values, they won't update.
entry = {
"topics": topics,
"parameters": params,
}
if variant not in target:
target[variant] = v
updates.append((
f"processor:{processor}",
variant,
json.dumps(entry),
))
await self.config.get("active-flow").put(
processor, json.dumps(target)
)
await self.config.put_many(updates)
def repl_interface(i):
if isinstance(i, str):
return repl_template_with_params(i)
else:
return {
k: repl_template_with_params(v)
for k, v in i.items()
}
return {
k: repl_template_with_params(v)
for k, v in i.items()
}
if "interfaces" in cls:
interfaces = {
@ -258,8 +269,8 @@ class FlowConfig:
else:
interfaces = {}
await self.config.get("flow").put(
msg.flow_id,
await self.config.put(
"flow", msg.flow_id,
json.dumps({
"description": msg.description,
"blueprint-name": msg.blueprint_name,
@ -268,23 +279,131 @@ class FlowConfig:
})
)
await self.config.inc_version()
await self.config.push(types=["active-flow", "flow"])
return FlowResponse(
error = None,
)
async def ensure_existing_flow_queues(self):
"""Ensure queues exist for all already-running flows.
Called on startup to handle flows that were started before this
version of the flow service was deployed, or before a restart.
"""
flow_ids = await self.config.keys("flow")
for flow_id in flow_ids:
try:
flow_data = await self.config.get("flow", flow_id)
if flow_data is None:
continue
flow = json.loads(flow_data)
blueprint_name = flow.get("blueprint-name")
if blueprint_name is None:
continue
# Skip flows that are mid-shutdown
if flow.get("status") == "stopping":
continue
parameters = flow.get("parameters", {})
blueprint_data = await self.config.get(
"flow-blueprint", blueprint_name
)
if blueprint_data is None:
logger.warning(
f"Blueprint '{blueprint_name}' not found for "
f"flow '{flow_id}', skipping queue creation"
)
continue
cls = json.loads(blueprint_data)
def repl_template(tmp):
result = tmp.replace(
"{blueprint}", blueprint_name
).replace(
"{id}", flow_id
)
for param_name, param_value in parameters.items():
result = result.replace(
f"{{{param_name}}}", str(param_value)
)
return result
queues = self._collect_flow_queues(cls, repl_template)
for topic, subscription in queues:
await self.pubsub.ensure_queue(topic, subscription)
logger.info(
f"Ensured queues for existing flow '{flow_id}'"
)
except Exception as e:
logger.error(
f"Failed to ensure queues for flow '{flow_id}': {e}"
)
def _collect_flow_queues(self, cls, repl_template):
"""Collect (topic, subscription) pairs for all flow-level queues.
Iterates the blueprint's "flow" section and reads only the
"topics" dict from each processor entry.
"""
queues = []
for k, v in cls["flow"].items():
processor, variant = k.split(":", 1)
variant = repl_template(variant)
for spec_name, topic_template in v.get("topics", {}).items():
topic = repl_template(topic_template)
subscription = f"{processor}--{variant}--{spec_name}"
queues.append((topic, subscription))
return queues
async def _delete_queues(self, queues):
"""Delete queues with retries. Best-effort — logs failures but
does not raise."""
for attempt in range(DELETE_RETRIES):
remaining = []
for topic, subscription in queues:
try:
await self.pubsub.delete_queue(topic, subscription)
except Exception as e:
logger.warning(
f"Queue delete failed (attempt {attempt + 1}/"
f"{DELETE_RETRIES}): {topic}: {e}"
)
remaining.append((topic, subscription))
if not remaining:
return
queues = remaining
if attempt < DELETE_RETRIES - 1:
await asyncio.sleep(DELETE_RETRY_DELAY)
for topic, subscription in queues:
logger.error(
f"Failed to delete queue after {DELETE_RETRIES} "
f"attempts: {topic}"
)
async def handle_stop_flow(self, msg):
if msg.flow_id is None:
raise RuntimeError("No flow ID")
if msg.flow_id not in await self.config.get("flow").keys():
if msg.flow_id not in await self.config.keys("flow"):
raise RuntimeError("Flow ID invalid")
flow = json.loads(await self.config.get("flow").get(msg.flow_id))
flow = json.loads(await self.config.get("flow", msg.flow_id))
if "blueprint-name" not in flow:
raise RuntimeError("Internal error: flow has no flow blueprint")
@ -292,7 +411,9 @@ class FlowConfig:
blueprint_name = flow["blueprint-name"]
parameters = flow.get("parameters", {})
cls = json.loads(await self.config.get("flow-blueprint").get(blueprint_name))
cls = json.loads(
await self.config.get("flow-blueprint", blueprint_name)
)
def repl_template(tmp):
result = tmp.replace(
@ -305,34 +426,33 @@ class FlowConfig:
result = result.replace(f"{{{param_name}}}", str(param_value))
return result
for kind in ("flow",):
# Collect queue identifiers before removing config
queues = self._collect_flow_queues(cls, repl_template)
for k, v in cls[kind].items():
# Phase 1: Set status to "stopping" and remove processor config.
# The config push tells processors to shut down their consumers.
flow["status"] = "stopping"
await self.config.put(
"flow", msg.flow_id, json.dumps(flow)
)
processor, variant = k.split(":", 1)
# Delete all processor config entries for this flow.
deletes = []
variant = repl_template(variant)
for k, v in cls["flow"].items():
flac = await self.config.get("active-flow").get(processor)
processor, variant = k.split(":", 1)
variant = repl_template(variant)
if flac is not None:
target = json.loads(flac)
else:
target = {}
deletes.append((f"processor:{processor}", variant))
if variant in target:
del target[variant]
await self.config.delete_many(deletes)
await self.config.get("active-flow").put(
processor, json.dumps(target)
)
# Phase 2: Delete queues with retries, then remove the flow record.
await self._delete_queues(queues)
if msg.flow_id in await self.config.get("flow").keys():
await self.config.get("flow").delete(msg.flow_id)
await self.config.inc_version()
await self.config.push(types=["active-flow", "flow"])
if msg.flow_id in await self.config.keys("flow"):
await self.config.delete("flow", msg.flow_id)
return FlowResponse(
error = None,
@ -368,4 +488,3 @@ class FlowConfig:
)
return resp

View file

@ -0,0 +1,162 @@
"""
Flow service. Manages flow lifecycle starting and stopping flows
by coordinating with the config service via pub/sub.
"""
import logging
from trustgraph.schema import Error
from trustgraph.schema import FlowRequest, FlowResponse
from trustgraph.schema import flow_request_queue, flow_response_queue
from trustgraph.schema import ConfigRequest, ConfigResponse
from trustgraph.schema import config_request_queue, config_response_queue
from trustgraph.base import AsyncProcessor, Consumer, Producer
from trustgraph.base import ConsumerMetrics, ProducerMetrics, SubscriberMetrics
from trustgraph.base import ConfigClient
from . flow import FlowConfig
# Module logger
logger = logging.getLogger(__name__)
default_ident = "flow-svc"
default_flow_request_queue = flow_request_queue
default_flow_response_queue = flow_response_queue
class Processor(AsyncProcessor):
def __init__(self, **params):
flow_request_queue = params.get(
"flow_request_queue", default_flow_request_queue
)
flow_response_queue = params.get(
"flow_response_queue", default_flow_response_queue
)
id = params.get("id")
super(Processor, self).__init__(
**params | {
"flow_request_schema": FlowRequest.__name__,
"flow_response_schema": FlowResponse.__name__,
}
)
flow_request_metrics = ConsumerMetrics(
processor = self.id, flow = None, name = "flow-request"
)
flow_response_metrics = ProducerMetrics(
processor = self.id, flow = None, name = "flow-response"
)
self.flow_request_topic = flow_request_queue
self.flow_request_subscriber = id
self.flow_request_consumer = Consumer(
taskgroup = self.taskgroup,
backend = self.pubsub,
flow = None,
topic = flow_request_queue,
subscriber = id,
schema = FlowRequest,
handler = self.on_flow_request,
metrics = flow_request_metrics,
)
self.flow_response_producer = Producer(
backend = self.pubsub,
topic = flow_response_queue,
schema = FlowResponse,
metrics = flow_response_metrics,
)
config_req_metrics = ProducerMetrics(
processor=self.id, flow=None, name="config-request",
)
config_resp_metrics = SubscriberMetrics(
processor=self.id, flow=None, name="config-response",
)
self.config_client = ConfigClient(
backend=self.pubsub,
subscription=f"{self.id}--config--{id}",
consumer_name=self.id,
request_topic=config_request_queue,
request_schema=ConfigRequest,
request_metrics=config_req_metrics,
response_topic=config_response_queue,
response_schema=ConfigResponse,
response_metrics=config_resp_metrics,
)
self.flow = FlowConfig(self.config_client, self.pubsub)
logger.info("Flow service initialized")
async def start(self):
await self.pubsub.ensure_queue(
self.flow_request_topic, self.flow_request_subscriber
)
await self.config_client.start()
await self.flow.ensure_existing_flow_queues()
await self.flow_request_consumer.start()
async def on_flow_request(self, msg, consumer, flow):
try:
v = msg.value()
# Sender-produced ID
id = msg.properties()["id"]
logger.debug(f"Handling flow request {id}...")
resp = await self.flow.handle(v)
await self.flow_response_producer.send(
resp, properties={"id": id}
)
except Exception as e:
logger.error(f"Flow request failed: {e}")
resp = FlowResponse(
error=Error(
type = "flow-error",
message = str(e),
),
)
await self.flow_response_producer.send(
resp, properties={"id": id}
)
@staticmethod
def add_args(parser):
AsyncProcessor.add_args(parser)
parser.add_argument(
'--flow-request-queue',
default=default_flow_request_queue,
help=f'Flow request queue (default: {default_flow_request_queue})'
)
parser.add_argument(
'--flow-response-queue',
default=default_flow_response_queue,
help=f'Flow response queue {default_flow_response_queue}',
)
def run():
Processor.launch(default_ident, __doc__)

View file

@ -54,7 +54,7 @@ class ConfigReceiver:
return
# Gateway cares about flow config
if notify_types and "flow" not in notify_types and "active-flow" not in notify_types:
if notify_types and "flow" not in notify_types:
logger.debug(
f"Ignoring config notify v{notify_version}, "
f"no flow types in {notify_types}"

View file

@ -226,7 +226,7 @@ class DispatcherManager:
raise RuntimeError("This kind not supported by flow")
# FIXME: The -store bit, does it make sense?
qconfig = intf_defs[int_kind]
qconfig = intf_defs[int_kind]["flow"]
id = str(uuid.uuid4())
dispatcher = import_dispatchers[kind](
@ -264,7 +264,7 @@ class DispatcherManager:
if int_kind not in intf_defs:
raise RuntimeError("This kind not supported by flow")
qconfig = intf_defs[int_kind]
qconfig = intf_defs[int_kind]["flow"]
id = str(uuid.uuid4())
dispatcher = export_dispatchers[kind](
@ -320,7 +320,7 @@ class DispatcherManager:
elif kind in sender_dispatchers:
dispatcher = sender_dispatchers[kind](
backend = self.backend,
queue = qconfig,
queue = qconfig["flow"],
)
else:
raise RuntimeError("Invalid kind")

View file

@ -162,6 +162,9 @@ class Processor(AsyncProcessor):
processor = self.id, flow = None, name = "storage-response"
)
self.librarian_request_topic = librarian_request_queue
self.librarian_request_subscriber = id
self.librarian_request_consumer = Consumer(
taskgroup = self.taskgroup,
backend = self.pubsub,
@ -180,6 +183,9 @@ class Processor(AsyncProcessor):
metrics = librarian_response_metrics,
)
self.collection_request_topic = collection_request_queue
self.collection_request_subscriber = id
self.collection_request_consumer = Consumer(
taskgroup = self.taskgroup,
backend = self.pubsub,
@ -248,7 +254,7 @@ class Processor(AsyncProcessor):
self.register_config_handler(
self.on_librarian_config,
types=["flow", "active-flow"],
types=["flow"],
)
self.flows = {}
@ -257,6 +263,12 @@ class Processor(AsyncProcessor):
async def start(self):
await self.pubsub.ensure_queue(
self.librarian_request_topic, self.librarian_request_subscriber
)
await self.pubsub.ensure_queue(
self.collection_request_topic, self.collection_request_subscriber
)
await super(Processor, self).start()
await self.librarian_request_consumer.start()
await self.librarian_response_producer.start()
@ -365,12 +377,12 @@ class Processor(AsyncProcessor):
else:
kind = "document-load"
q = flow["interfaces"][kind]
q = flow["interfaces"][kind]["flow"]
# Emit document provenance to knowledge graph
if "triples-store" in flow["interfaces"]:
await self.emit_document_provenance(
document, processing, flow["interfaces"]["triples-store"]
document, processing, flow["interfaces"]["triples-store"]["flow"]
)
if kind == "text-load":