diff --git a/docs/tech-specs/active-flow-key-restructure.md b/docs/tech-specs/active-flow-key-restructure.md new file mode 100644 index 00000000..754e7953 --- /dev/null +++ b/docs/tech-specs/active-flow-key-restructure.md @@ -0,0 +1,67 @@ +--- +layout: default +title: "Active-Flow Key Restructure" +parent: "Tech Specs" +--- + +# Active-Flow Key Restructure + +## Problem + +Active-flow config uses `('active-flow', processor)` as its key, where +each processor's value is a JSON blob containing all flow variants +assigned to that processor: + +``` +('active-flow', 'chunker') -> { "default": {...}, "flow2": {...} } +``` + +This causes two problems: + +1. **Read-modify-write on every change.** Starting or stopping a flow + requires fetching the processor's current blob, parsing it, adding + or removing a variant, serialising it, and writing it back. This is + a concurrency hazard if two flow operations target the same + processor simultaneously. + +2. **Noisy config pushes.** Config subscribers subscribe to a type, + not a specific key. Every active-flow write triggers a config push + that causes every processor in the system to fetch the full config + and re-evaluate, even though only one processor's config changed. + With N processors in a blueprint, a single flow start/stop causes + N writes and N^2 config fetches across the system. + +## Proposed Change + +Restructure the key to `('active-flow', 'processor:variant')` where +each key holds a single flow variant's configuration: + +``` +('active-flow', 'chunker:default') -> { "topics": {...}, "parameters": {...} } +('active-flow', 'chunker:flow2') -> { "topics": {...}, "parameters": {...} } +``` + +Starting a flow is a set of clean puts. Stopping a flow is a set of +clean deletes. No read-modify-write. No JSON blob merging. + +The config push problem (all processors fetching on every change) +remains — that's a limitation of the config subscription model and +would require per-key subscriptions to solve. But eliminating the +read-modify-write removes the concurrency hazard and simplifies the +flow service code. + +## What Changes + +- **Flow service** (`flow.py`): `handle_start_flow` writes individual + keys per processor:variant instead of merging into per-processor + blobs. `handle_stop_flow` deletes individual keys instead of + read-modify-write. +- **FlowProcessor** (`flow_processor.py`): `on_configure_flows` + currently looks up `config["active-flow"][self.id]` to find a JSON + blob of all its variants. Needs to scan all active-flow keys for + entries prefixed with `self.id:` and assemble its flow list from + those. +- **Config client**: May benefit from a prefix-scan or pattern-match + query to support the FlowProcessor lookup efficiently. +- **Initial config / bootstrapping**: Any code that seeds active-flow + entries at deployment time needs to use the new key format. diff --git a/docs/tech-specs/flow-service-queue-lifecycle.md b/docs/tech-specs/flow-service-queue-lifecycle.md new file mode 100644 index 00000000..a724a2ef --- /dev/null +++ b/docs/tech-specs/flow-service-queue-lifecycle.md @@ -0,0 +1,299 @@ +--- +layout: default +title: "Flow Service Separation and Queue Lifecycle Management" +parent: "Tech Specs" +--- + +# Flow Service Separation and Queue Lifecycle Management + +## Overview + +This specification describes the separation of the flow service from the +config service into an independent service, and the addition of explicit +queue lifecycle management to the pub/sub backend abstraction. + +Every queue in the system has an explicit owner responsible for its +creation and deletion: + +- **Flow and blueprint queues** — owned by the flow service +- **System queues** (config, librarian, knowledge, etc.) — owned by + the services themselves + +Consumers never create queues. They connect to queues that already +exist. + +This addresses a fundamental problem across broker backends: without an +authoritative lifecycle owner, queues are created as a side effect of +consumer connections and never explicitly deleted. In RabbitMQ, this +leads to orphaned durable queues. In Pulsar, persistent topics and +subscriptions survive consumer crashes. In Kafka, topics persist +indefinitely. The solution is the same for all backends: explicit +lifecycle management through the `PubSubBackend` protocol. + +--- + +## Background + +### Current Architecture + +The flow service (`FlowConfig`) and config service (`Configuration`) +are co-located in a single process: `trustgraph-flow/config/service`. +They share a `Processor` class that inherits from `AsyncProcessor` and +manages both config and flow request/response queues. `FlowConfig` +receives a direct reference to the `Configuration` object, giving it +backdoor access to `inc_version()` and `push()` — methods that bypass +the config service's own pub/sub interface. + +The flow service manages flow lifecycle (start/stop) by manipulating +config state — active-flow entries, flow records, blueprint lookups — +but takes no active part in broker queue management. Queues are created +implicitly when the first consumer connects and are never explicitly +deleted. + +### The Queue Lifecycle Problem + +Queues are currently created as a side effect of consumer connections +(in `_connect()` for RabbitMQ, in `subscribe()` for Pulsar). No single +component owns queue lifecycle, leading to two failure modes: + +- **Orphaned queues**: When a flow is stopped, consumers shut down but + their queues remain — along with any messages in them. In RabbitMQ, + durable queues persist indefinitely. In Pulsar, persistent topics and + their subscriptions survive consumer disconnection unless + `unsubscribe()` is explicitly called — which doesn't happen on crash + or error paths. +- **Premature deletion**: If consumers attempt to delete queues on + exit, error-path shutdowns destroy queues that other consumers or the + system still need. + +Neither strategy is reliable because neither the consumer nor the +broker knows whether a queue should exist — only the flow manager +knows that. + +### Why Separation + +The flow service currently piggybacks on the config service process. +Adding broker queue management to the flow service introduces operations +that may have significant latency — RabbitMQ queue operations are +generally fast, but Kafka topic creation can involve partition +assignment, replication, and leader election delays. + +The config service is on the critical path for every service in the +system — all services read configuration on startup and respond to +config pushes. Blocking the config service's request/response loop +while waiting for broker operations risks cascading latency across the +entire deployment. + +Separating the flow service into its own process gives it an +independent latency budget. A slow `start-flow` (waiting for queue +creation across multiple brokers) does not affect config reads. +Additionally, the flow service's direct access to the `Configuration` +object is a coupling that masks what should be a clean client +relationship — the flow service only needs to read and write config +entries, which is exactly what the existing config client provides. + +--- + +## Design + +### Queue Ownership Model + +Every queue in the system has exactly one owner responsible for its +creation and deletion: + +| Queue type | Owner | Created when | Deleted when | +|---|---|---|---| +| Flow queues | Flow service | `start-flow` | `stop-flow` | +| Blueprint queues | Flow service | `start-flow` (idempotent) | Never (shared across flow instances) | +| System queues (config, librarian, knowledge, etc.) | Each service | Service startup | Never (system lifetime) | + +Consumers never create queues. The consumer's `_connect()` method +connects to a queue that must already exist — it does not declare or +create it. + +### Flow Service as Independent Service + +The flow service becomes its own `Processor(AsyncProcessor)` in a +separate module and process. It: + +- Listens on the existing flow request/response queues (already distinct + from config queues — no consumer migration needed). +- Uses the async `ConfigClient` (extending `RequestResponse`) to + read/write config state (blueprints, active-flow entries, flow + records). Config pushes are triggered automatically by config + writes — the backdoor `inc_version()` and `push()` calls are no + longer needed. +- Has direct access to the pub/sub backend (inherited from + `AsyncProcessor`) for queue lifecycle operations. + +The config service (`trustgraph-flow/config/service`) is simplified: +the flow consumer, flow producer, and `FlowConfig` class are removed. +It returns to being purely a config service. + +### Queue Lifecycle in the Pub/Sub Backend + +The `PubSubBackend` protocol gains queue management methods. All new +methods are async — backends that use blocking I/O (e.g., pika for +RabbitMQ) handle threading internally. + +``` +PubSubBackend: + create_producer(...) # existing + create_consumer(...) # existing + close() # existing + async create_queue(topic, subscription) # new + async delete_queue(topic, subscription) # new + async queue_exists(topic, subscription) # new + async ensure_queue(topic, subscription) # new +``` + +- `create_queue` — create a queue. Idempotent if queue already exists + with the same properties. Fails if properties mismatch. +- `delete_queue` — delete a queue and its messages. Idempotent if + queue does not exist. +- `queue_exists` — check whether a queue exists. Returns bool. +- `ensure_queue` — create-if-not-exists convenience wrapper. + +The `topic` and `subscription` parameters together identify the queue, +mirroring `create_consumer` where the queue name is derived from both. + +Backend implementations: + +- **RabbitMQ**: `queue_declare`, `queue_delete`, and + `queue_declare(passive=True)` via pika. Blocking calls wrapped in + `asyncio.to_thread` inside the backend. Queue name derived using the + existing `_parse_queue_id` logic. +- **Pulsar**: REST calls to the Pulsar admin API (port 8080). + Create/delete persistent topics, delete subscriptions. Requires admin + URL as additional configuration alongside the broker URL. +- **Kafka** (future): `AdminClient.create_topics()` and + `AdminClient.delete_topics()` from the `confluent-kafka` library. + Uses the same bootstrap servers as the broker connection. + +### Flow Start: Queue Creation + +When `handle_start_flow` processes a flow start request, after +resolving parameters and computing the template-substituted topic +identifiers, it creates queues before writing config state. + +Queues are created for both `cls["blueprint"]` and `cls["flow"]` +entries. Blueprint queue creation is idempotent — multiple flows +creating the same blueprint queue is safe. + +The flow start request returns only after queues are confirmed ready. +This gives callers a hard guarantee: when `start-flow` succeeds, the +data path is fully wired. + +### Flow Stop: Two-Phase Shutdown + +Stopping a flow is a two-phase transaction with a retry window between +them. + +**Phase 1 — Signal processors to shut down:** + +1. Set the flow record's status to `"stopping"`. This marks the flow + as in-progress so that if the flow service crashes mid-stop, it can + identify and resume incomplete shutdowns on restart. +2. Remove the flow's variants from each processor's `active-flow` + config entries. +3. Config push fires automatically. Each `FlowProcessor` receives the + update, compares wanted vs current flows, and calls `stop_flow` on + flows no longer wanted — closing consumers and producers. + +**Phase 2 — Delete queues with retries, then finalise:** + +1. Retry queue deletion with delays, giving processors time to react + to the config change and disconnect. Queue deletion is idempotent — + if a queue was already removed by a previous attempt or was never + created, the operation succeeds silently. Only `cls["flow"]` entries + (per-flow-instance queues) are deleted — `cls["blueprint"]` entries + are shared infrastructure and are not touched. +2. Delete the `flow` record from config. + +The flow service retries persistently. A queue that cannot be deleted +after retries is logged as an error but does not block the stop +transaction from completing — a leaked queue is less harmful than a +flow that cannot be stopped. + +**Crash recovery:** On startup, the flow service scans for flow +records with `"status": "stopping"`. These represent incomplete +shutdowns from a previous run. For each, it resumes from the +appropriate point — if active-flow entries are already cleared, it +proceeds directly to phase 2 (queue deletion and flow record cleanup). + +### System Service Queues + +System services (config, librarian, knowledge, etc.) are not managed +by the flow service. Each service calls `ensure_queue` for its own +queues during startup. These queues persist for the lifetime of the +system and are never explicitly deleted. + +### Consumer Connection + +Consumers never create queues. The consumer connects to a queue that +must already exist — created either by the flow service (for flow and +blueprint queues) or by the service itself (for system queues). + +For RabbitMQ, this means `_connect()` no longer calls `queue_declare`. +It connects to a queue by name and fails if the queue does not exist. + +For Pulsar, `subscribe()` inherently creates a subscription. This is +how Pulsar works and does not conflict with the lifecycle model — +Pulsar's broker manages subscription state, and the flow service uses +the admin API for explicit cleanup. + +--- + +## Operational Impact + +### Deployment + +The flow service is a new container/process alongside the existing +config service. It requires: + +- Access to the message broker (same credentials as other services — + inherited from `AsyncProcessor` via standard CLI args). +- Access to the config service via pub/sub (config request/response + queues — same as any other service that reads config). +- For Pulsar: the admin API URL (separate from the broker URL). + +It does not require direct Cassandra access. + +### Backward Compatibility + +- The flow request/response queue interface is unchanged — API gateway + and CLI tools that send flow requests continue to work without + modification. +- The config service loses its flow handling capability, so both + services must be deployed together. This is a breaking change in + deployment topology but not in API. +- Queue deletion on flow stop is new behaviour. Existing deployments + that rely on queues persisting after flow stop (e.g. for post-mortem + message inspection) would need to drain queues before stopping flows. + +--- + +## Assumptions + +- **The flow service is the sole writer of flow configuration.** The + two-phase stop transaction relies on the flow record's `"stopping"` + status being authoritative — no other service or process modifies + flow records, active-flow entries, or flow blueprints. This is true + today (only `FlowConfig` writes to these config keys) and must remain + true after separation. The config service provides the storage, but + the flow service owns the semantics. + +--- + +## Design Decisions + +| Decision | Resolution | Rationale | +|---|---|---| +| Queue ownership | Every queue has exactly one explicit owner | Eliminates implicit creation, makes lifecycle auditable | +| Queue deletion strategy | Retry persistently, don't block stop | A leaked queue is less harmful than a flow stuck in stopping state | +| Purge without delete | Not needed | Flows are fully dynamic — stop and restart recreates everything | +| Blueprint-level queues | Created on flow start (idempotent), never deleted | Shared across flow instances; creation is safe, deletion would break other flows | +| Flow stop atomicity | Two-phase with `"stopping"` state | Allows crash recovery; flow service can resume incomplete shutdowns | +| Backend protocol methods | All async | Backends hide blocking I/O internally; callers never deal with threading | +| Pulsar lifecycle | REST admin API, not no-op | Persistent topics and subscriptions survive crashes; explicit cleanup needed | +| Consumer queue creation | Consumers never create queues | Single ownership; `_connect()` connects to existing queues only | diff --git a/tests/unit/test_base/test_flow_base_modules.py b/tests/unit/test_base/test_flow_base_modules.py index 33a3c4c2..5bbd7a18 100644 --- a/tests/unit/test_base/test_flow_base_modules.py +++ b/tests/unit/test_base/test_flow_base_modules.py @@ -11,7 +11,7 @@ def test_parameter_spec_is_a_spec_and_adds_parameter_value(): flow = MagicMock(parameter={}) processor = MagicMock() - spec.add(flow, processor, {"temperature": 0.7}) + spec.add(flow, processor, {"parameters": {"temperature": 0.7}}) assert isinstance(spec, Spec) assert "temperature" in flow.parameter diff --git a/tests/unit/test_base/test_flow_processor.py b/tests/unit/test_base/test_flow_processor.py index 8672831a..36a05ec2 100644 --- a/tests/unit/test_base/test_flow_processor.py +++ b/tests/unit/test_base/test_flow_processor.py @@ -1,58 +1,50 @@ """ Unit tests for trustgraph.base.flow_processor -Starting small with a single test to verify basic functionality """ import pytest from unittest.mock import AsyncMock, MagicMock, patch from unittest import IsolatedAsyncioTestCase -# Import the service under test from trustgraph.base.flow_processor import FlowProcessor +# Patches needed to let AsyncProcessor.__init__ run without real +# infrastructure while still setting self.id correctly. +ASYNC_PROCESSOR_PATCHES = [ + patch('trustgraph.base.async_processor.get_pubsub', return_value=MagicMock()), + patch('trustgraph.base.async_processor.ProcessorMetrics', return_value=MagicMock()), + patch('trustgraph.base.async_processor.Consumer', return_value=MagicMock()), +] + + +def with_async_processor_patches(func): + """Apply all AsyncProcessor dependency patches to a test.""" + for p in reversed(ASYNC_PROCESSOR_PATCHES): + func = p(func) + return func + + class TestFlowProcessorSimple(IsolatedAsyncioTestCase): """Test FlowProcessor base class functionality""" - @patch('trustgraph.base.async_processor.AsyncProcessor.__init__') - @patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler') - async def test_flow_processor_initialization_basic(self, mock_register_config, mock_async_init): + @with_async_processor_patches + async def test_flow_processor_initialization_basic(self, *mocks): """Test basic FlowProcessor initialization""" - # Arrange - mock_async_init.return_value = None - mock_register_config.return_value = None - config = { 'id': 'test-flow-processor', 'taskgroup': AsyncMock() } - # Act processor = FlowProcessor(**config) - # Assert - # Verify AsyncProcessor.__init__ was called - mock_async_init.assert_called_once() - - # Verify register_config_handler was called with the correct handler - mock_register_config.assert_called_once_with( - processor.on_configure_flows, types=["active-flow"] - ) - - # Verify FlowProcessor-specific initialization - assert hasattr(processor, 'flows') + assert processor.id == 'test-flow-processor' assert processor.flows == {} - assert hasattr(processor, 'specifications') assert processor.specifications == [] - @patch('trustgraph.base.async_processor.AsyncProcessor.__init__') - @patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler') - async def test_register_specification(self, mock_register_config, mock_async_init): + @with_async_processor_patches + async def test_register_specification(self, *mocks): """Test registering a specification""" - # Arrange - mock_async_init.return_value = None - mock_register_config.return_value = None - config = { 'id': 'test-flow-processor', 'taskgroup': AsyncMock() @@ -62,288 +54,210 @@ class TestFlowProcessorSimple(IsolatedAsyncioTestCase): mock_spec = MagicMock() mock_spec.name = 'test-spec' - # Act processor.register_specification(mock_spec) - # Assert assert len(processor.specifications) == 1 assert processor.specifications[0] == mock_spec @patch('trustgraph.base.flow_processor.Flow') - @patch('trustgraph.base.async_processor.AsyncProcessor.__init__') - @patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler') - async def test_start_flow(self, mock_register_config, mock_async_init, mock_flow_class): + @with_async_processor_patches + async def test_start_flow(self, *mocks): """Test starting a flow""" - # Arrange - mock_async_init.return_value = None - mock_register_config.return_value = None - + mock_flow_class = mocks[-1] + config = { - 'id': 'test-flow-processor', + 'id': 'test-processor', 'taskgroup': AsyncMock() } processor = FlowProcessor(**config) - processor.id = 'test-processor' # Set id for Flow creation - + mock_flow = AsyncMock() mock_flow_class.return_value = mock_flow - + flow_name = 'test-flow' flow_defn = {'config': 'test-config'} - # Act await processor.start_flow(flow_name, flow_defn) - # Assert assert flow_name in processor.flows - # Verify Flow was created with correct parameters - mock_flow_class.assert_called_once_with('test-processor', flow_name, processor, flow_defn) - # Verify the flow's start method was called + mock_flow_class.assert_called_once_with( + 'test-processor', flow_name, processor, flow_defn + ) mock_flow.start.assert_called_once() @patch('trustgraph.base.flow_processor.Flow') - @patch('trustgraph.base.async_processor.AsyncProcessor.__init__') - @patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler') - async def test_stop_flow(self, mock_register_config, mock_async_init, mock_flow_class): + @with_async_processor_patches + async def test_stop_flow(self, *mocks): """Test stopping a flow""" - # Arrange - mock_async_init.return_value = None - mock_register_config.return_value = None - + mock_flow_class = mocks[-1] + config = { - 'id': 'test-flow-processor', + 'id': 'test-processor', 'taskgroup': AsyncMock() } processor = FlowProcessor(**config) - processor.id = 'test-processor' - + mock_flow = AsyncMock() mock_flow_class.return_value = mock_flow - - flow_name = 'test-flow' - flow_defn = {'config': 'test-config'} - # Start a flow first - await processor.start_flow(flow_name, flow_defn) - - # Act + flow_name = 'test-flow' + await processor.start_flow(flow_name, {'config': 'test-config'}) + await processor.stop_flow(flow_name) - # Assert assert flow_name not in processor.flows mock_flow.stop.assert_called_once() - @patch('trustgraph.base.flow_processor.Flow') - @patch('trustgraph.base.async_processor.AsyncProcessor.__init__') - @patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler') - async def test_stop_flow_not_exists(self, mock_register_config, mock_async_init, mock_flow_class): + @with_async_processor_patches + async def test_stop_flow_not_exists(self, *mocks): """Test stopping a flow that doesn't exist""" - # Arrange - mock_async_init.return_value = None - mock_register_config.return_value = None - config = { 'id': 'test-flow-processor', 'taskgroup': AsyncMock() } processor = FlowProcessor(**config) - - # Act - should not raise an exception + await processor.stop_flow('non-existent-flow') - # Assert - flows dict should still be empty assert processor.flows == {} @patch('trustgraph.base.flow_processor.Flow') - @patch('trustgraph.base.async_processor.AsyncProcessor.__init__') - @patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler') - async def test_on_configure_flows_basic(self, mock_register_config, mock_async_init, mock_flow_class): + @with_async_processor_patches + async def test_on_configure_flows_basic(self, *mocks): """Test basic flow configuration handling""" - # Arrange - mock_async_init.return_value = None - mock_register_config.return_value = None - + mock_flow_class = mocks[-1] + config = { - 'id': 'test-flow-processor', + 'id': 'test-processor', 'taskgroup': AsyncMock() } processor = FlowProcessor(**config) - processor.id = 'test-processor' - + mock_flow = AsyncMock() mock_flow_class.return_value = mock_flow - - # Configuration with flows for this processor - flow_config = { - 'test-flow': {'config': 'test-config'} - } + config_data = { - 'active-flow': { - 'test-processor': '{"test-flow": {"config": "test-config"}}' + 'processor:test-processor': { + 'test-flow': '{"config": "test-config"}' } } - - # Act + await processor.on_configure_flows(config_data, version=1) - # Assert assert 'test-flow' in processor.flows - mock_flow_class.assert_called_once_with('test-processor', 'test-flow', processor, {'config': 'test-config'}) + mock_flow_class.assert_called_once_with( + 'test-processor', 'test-flow', processor, + {'config': 'test-config'} + ) mock_flow.start.assert_called_once() - @patch('trustgraph.base.flow_processor.Flow') - @patch('trustgraph.base.async_processor.AsyncProcessor.__init__') - @patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler') - async def test_on_configure_flows_no_config(self, mock_register_config, mock_async_init, mock_flow_class): + @with_async_processor_patches + async def test_on_configure_flows_no_config(self, *mocks): """Test flow configuration handling when no config exists for this processor""" - # Arrange - mock_async_init.return_value = None - mock_register_config.return_value = None - config = { - 'id': 'test-flow-processor', + 'id': 'test-processor', 'taskgroup': AsyncMock() } processor = FlowProcessor(**config) - processor.id = 'test-processor' - - # Configuration without flows for this processor + config_data = { - 'active-flow': { - 'other-processor': '{"other-flow": {"config": "other-config"}}' + 'processor:other-processor': { + 'other-flow': '{"config": "other-config"}' } } - - # Act + await processor.on_configure_flows(config_data, version=1) - # Assert assert processor.flows == {} - mock_flow_class.assert_not_called() - @patch('trustgraph.base.flow_processor.Flow') - @patch('trustgraph.base.async_processor.AsyncProcessor.__init__') - @patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler') - async def test_on_configure_flows_invalid_config(self, mock_register_config, mock_async_init, mock_flow_class): + @with_async_processor_patches + async def test_on_configure_flows_invalid_config(self, *mocks): """Test flow configuration handling with invalid config format""" - # Arrange - mock_async_init.return_value = None - mock_register_config.return_value = None - config = { - 'id': 'test-flow-processor', + 'id': 'test-processor', 'taskgroup': AsyncMock() } processor = FlowProcessor(**config) - processor.id = 'test-processor' - - # Configuration without active-flow key + config_data = { 'other-data': 'some-value' } - - # Act + await processor.on_configure_flows(config_data, version=1) - # Assert assert processor.flows == {} - mock_flow_class.assert_not_called() @patch('trustgraph.base.flow_processor.Flow') - @patch('trustgraph.base.async_processor.AsyncProcessor.__init__') - @patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler') - async def test_on_configure_flows_start_and_stop(self, mock_register_config, mock_async_init, mock_flow_class): + @with_async_processor_patches + async def test_on_configure_flows_start_and_stop(self, *mocks): """Test flow configuration handling with starting and stopping flows""" - # Arrange - mock_async_init.return_value = None - mock_register_config.return_value = None - + mock_flow_class = mocks[-1] + config = { - 'id': 'test-flow-processor', + 'id': 'test-processor', 'taskgroup': AsyncMock() } processor = FlowProcessor(**config) - processor.id = 'test-processor' - + mock_flow1 = AsyncMock() mock_flow2 = AsyncMock() mock_flow_class.side_effect = [mock_flow1, mock_flow2] - - # First configuration - start flow1 + config_data1 = { - 'active-flow': { - 'test-processor': '{"flow1": {"config": "config1"}}' + 'processor:test-processor': { + 'flow1': '{"config": "config1"}' } } await processor.on_configure_flows(config_data1, version=1) - # Second configuration - stop flow1, start flow2 config_data2 = { - 'active-flow': { - 'test-processor': '{"flow2": {"config": "config2"}}' + 'processor:test-processor': { + 'flow2': '{"config": "config2"}' } } - - # Act + await processor.on_configure_flows(config_data2, version=2) - # Assert - # flow1 should be stopped and removed assert 'flow1' not in processor.flows mock_flow1.stop.assert_called_once() - - # flow2 should be started and added + assert 'flow2' in processor.flows mock_flow2.start.assert_called_once() - @patch('trustgraph.base.async_processor.AsyncProcessor.__init__') - @patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler') + @with_async_processor_patches @patch('trustgraph.base.async_processor.AsyncProcessor.start') - async def test_start_calls_parent(self, mock_parent_start, mock_register_config, mock_async_init): + async def test_start_calls_parent(self, mock_parent_start, *mocks): """Test that start() calls parent start method""" - # Arrange - mock_async_init.return_value = None - mock_register_config.return_value = None mock_parent_start.return_value = None - + config = { 'id': 'test-flow-processor', 'taskgroup': AsyncMock() } processor = FlowProcessor(**config) - - # Act + await processor.start() - # Assert mock_parent_start.assert_called_once() - @patch('trustgraph.base.async_processor.AsyncProcessor.__init__') - @patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler') - async def test_add_args_calls_parent(self, mock_register_config, mock_async_init): + async def test_add_args_calls_parent(self): """Test that add_args() calls parent add_args method""" - # Arrange - mock_async_init.return_value = None - mock_register_config.return_value = None - mock_parser = MagicMock() - - # Act + with patch('trustgraph.base.async_processor.AsyncProcessor.add_args') as mock_parent_add_args: FlowProcessor.add_args(mock_parser) - # Assert mock_parent_add_args.assert_called_once_with(mock_parser) if __name__ == '__main__': - pytest.main([__file__]) \ No newline at end of file + pytest.main([__file__]) diff --git a/tests/unit/test_cores/test_knowledge_manager.py b/tests/unit/test_cores/test_knowledge_manager.py index 76095690..80c27fe8 100644 --- a/tests/unit/test_cores/test_knowledge_manager.py +++ b/tests/unit/test_cores/test_knowledge_manager.py @@ -30,8 +30,8 @@ def mock_flow_config(): mock_config.flows = { "test-flow": { "interfaces": { - "triples-store": "test-triples-queue", - "graph-embeddings-store": "test-ge-queue" + "triples-store": {"flow": "test-triples-queue"}, + "graph-embeddings-store": {"flow": "test-ge-queue"} } } } diff --git a/tests/unit/test_gateway/test_config_receiver.py b/tests/unit/test_gateway/test_config_receiver.py index c2a149d5..90ba8d33 100644 --- a/tests/unit/test_gateway/test_config_receiver.py +++ b/tests/unit/test_gateway/test_config_receiver.py @@ -121,7 +121,7 @@ class TestConfigReceiver: fetch_calls.append(kwargs) config_receiver.fetch_and_apply = mock_fetch - for type_name in ["flow", "active-flow"]: + for type_name in ["flow"]: fetch_calls.clear() config_receiver.config_version = 1 diff --git a/tests/unit/test_gateway/test_dispatch_manager.py b/tests/unit/test_gateway/test_dispatch_manager.py index 83969fdd..4ebcb5b9 100644 --- a/tests/unit/test_gateway/test_dispatch_manager.py +++ b/tests/unit/test_gateway/test_dispatch_manager.py @@ -277,7 +277,7 @@ class TestDispatcherManager: # Setup test flow manager.flows["test_flow"] = { "interfaces": { - "triples-store": {"queue": "test_queue"} + "triples-store": {"flow": "test_queue"} } } @@ -298,7 +298,7 @@ class TestDispatcherManager: backend=mock_backend, ws="ws", running="running", - queue={"queue": "test_queue"} + queue="test_queue" ) mock_dispatcher.start.assert_called_once() assert result == mock_dispatcher @@ -328,7 +328,7 @@ class TestDispatcherManager: # Setup test flow manager.flows["test_flow"] = { "interfaces": { - "triples-store": {"queue": "test_queue"} + "triples-store": {"flow": "test_queue"} } } @@ -350,7 +350,7 @@ class TestDispatcherManager: # Setup test flow manager.flows["test_flow"] = { "interfaces": { - "triples-store": {"queue": "test_queue"} + "triples-store": {"flow": "test_queue"} } } @@ -370,7 +370,7 @@ class TestDispatcherManager: backend=mock_backend, ws="ws", running="running", - queue={"queue": "test_queue"}, + queue="test_queue", consumer="api-gateway-test-uuid", subscriber="api-gateway-test-uuid" ) @@ -481,7 +481,7 @@ class TestDispatcherManager: # Setup test flow manager.flows["test_flow"] = { "interfaces": { - "text-load": {"queue": "text_load_queue"} + "text-load": {"flow": "text_load_queue"} } } @@ -502,7 +502,7 @@ class TestDispatcherManager: # Verify dispatcher was created with correct parameters mock_dispatcher_class.assert_called_once_with( backend=mock_backend, - queue={"queue": "text_load_queue"} + queue="text_load_queue" ) mock_dispatcher.start.assert_called_once() mock_dispatcher.process.assert_called_once_with("data", "responder") diff --git a/trustgraph-base/trustgraph/base/__init__.py b/trustgraph-base/trustgraph/base/__init__.py index ce17a585..91622156 100644 --- a/trustgraph-base/trustgraph/base/__init__.py +++ b/trustgraph-base/trustgraph/base/__init__.py @@ -5,7 +5,7 @@ from . consumer import Consumer from . producer import Producer from . publisher import Publisher from . subscriber import Subscriber -from . metrics import ProcessorMetrics, ConsumerMetrics, ProducerMetrics +from . metrics import ProcessorMetrics, ConsumerMetrics, ProducerMetrics, SubscriberMetrics from . logging import add_logging_args, setup_logging from . flow_processor import FlowProcessor from . consumer_spec import ConsumerSpec @@ -22,6 +22,7 @@ from . text_completion_client import ( TextCompletionClientSpec, TextCompletionClient, TextCompletionResult, ) from . prompt_client import PromptClientSpec, PromptClient, PromptResult +from . config_client import ConfigClientSpec, ConfigClient from . triples_store_service import TriplesStoreService from . graph_embeddings_store_service import GraphEmbeddingsStoreService from . document_embeddings_store_service import DocumentEmbeddingsStoreService diff --git a/trustgraph-base/trustgraph/base/backend.py b/trustgraph-base/trustgraph/base/backend.py index f0d6b397..0f95ca1b 100644 --- a/trustgraph-base/trustgraph/base/backend.py +++ b/trustgraph-base/trustgraph/base/backend.py @@ -159,6 +159,62 @@ class PubSubBackend(Protocol): """ ... + async def create_queue(self, topic: str, subscription: str) -> None: + """ + Pre-create a queue so it exists before any consumer connects. + + The topic and subscription together identify the queue, mirroring + create_consumer where the queue name is derived from both. + + Idempotent — creating an already-existing queue succeeds silently. + + Args: + topic: Queue identifier in class:topicspace:topic format + subscription: Subscription/consumer group name + """ + ... + + async def delete_queue(self, topic: str, subscription: str) -> None: + """ + Delete a queue and any messages it contains. + + The topic and subscription together identify the queue, mirroring + create_consumer where the queue name is derived from both. + + Idempotent — deleting a non-existent queue succeeds silently. + + Args: + topic: Queue identifier in class:topicspace:topic format + subscription: Subscription/consumer group name + """ + ... + + async def queue_exists(self, topic: str, subscription: str) -> bool: + """ + Check whether a queue exists. + + Args: + topic: Queue identifier in class:topicspace:topic format + subscription: Subscription/consumer group name + + Returns: + True if the queue exists, False otherwise. + """ + ... + + async def ensure_queue(self, topic: str, subscription: str) -> None: + """ + Ensure a queue exists, creating it if necessary. + + Convenience wrapper — checks existence, creates if missing. + Used by system services on startup. + + Args: + topic: Queue identifier in class:topicspace:topic format + subscription: Subscription/consumer group name + """ + ... + def close(self) -> None: """Close the backend connection.""" ... diff --git a/trustgraph-base/trustgraph/base/config_client.py b/trustgraph-base/trustgraph/base/config_client.py new file mode 100644 index 00000000..c9ec3f9b --- /dev/null +++ b/trustgraph-base/trustgraph/base/config_client.py @@ -0,0 +1,92 @@ + +from . request_response_spec import RequestResponse, RequestResponseSpec +from .. schema import ConfigRequest, ConfigResponse, ConfigKey, ConfigValue + +CONFIG_TIMEOUT = 10 + + +class ConfigClient(RequestResponse): + + async def _request(self, timeout=CONFIG_TIMEOUT, **kwargs): + resp = await self.request( + ConfigRequest(**kwargs), + timeout=timeout, + ) + if resp.error: + raise RuntimeError( + f"{resp.error.type}: {resp.error.message}" + ) + return resp + + async def get(self, type, key, timeout=CONFIG_TIMEOUT): + """Get a single config value. Returns the value string or None.""" + resp = await self._request( + operation="get", + keys=[ConfigKey(type=type, key=key)], + timeout=timeout, + ) + if resp.values and len(resp.values) > 0: + return resp.values[0].value + return None + + async def put(self, type, key, value, timeout=CONFIG_TIMEOUT): + """Put a single config value.""" + await self._request( + operation="put", + values=[ConfigValue(type=type, key=key, value=value)], + timeout=timeout, + ) + + async def put_many(self, values, timeout=CONFIG_TIMEOUT): + """Put multiple config values in a single request. + values is a list of (type, key, value) tuples.""" + await self._request( + operation="put", + values=[ + ConfigValue(type=t, key=k, value=v) + for t, k, v in values + ], + timeout=timeout, + ) + + async def delete(self, type, key, timeout=CONFIG_TIMEOUT): + """Delete a single config key.""" + await self._request( + operation="delete", + keys=[ConfigKey(type=type, key=key)], + timeout=timeout, + ) + + async def delete_many(self, keys, timeout=CONFIG_TIMEOUT): + """Delete multiple config keys in a single request. + keys is a list of (type, key) tuples.""" + await self._request( + operation="delete", + keys=[ + ConfigKey(type=t, key=k) + for t, k in keys + ], + timeout=timeout, + ) + + async def keys(self, type, timeout=CONFIG_TIMEOUT): + """List all keys for a config type.""" + resp = await self._request( + operation="list", + type=type, + timeout=timeout, + ) + return resp.directory + + +class ConfigClientSpec(RequestResponseSpec): + def __init__( + self, request_name, response_name, + ): + super(ConfigClientSpec, self).__init__( + request_name=request_name, + request_schema=ConfigRequest, + response_name=response_name, + response_schema=ConfigResponse, + impl=ConfigClient, + ) diff --git a/trustgraph-base/trustgraph/base/consumer_spec.py b/trustgraph-base/trustgraph/base/consumer_spec.py index ae218b44..023537df 100644 --- a/trustgraph-base/trustgraph/base/consumer_spec.py +++ b/trustgraph-base/trustgraph/base/consumer_spec.py @@ -23,7 +23,7 @@ class ConsumerSpec(Spec): taskgroup = processor.taskgroup, flow = flow, backend = processor.pubsub, - topic = definition[self.name], + topic = definition["topics"][self.name], subscriber = processor.id + "--" + flow.name + "--" + self.name, schema = self.schema, handler = self.handler, diff --git a/trustgraph-base/trustgraph/base/flow_processor.py b/trustgraph-base/trustgraph/base/flow_processor.py index edf974b3..99cb0f53 100644 --- a/trustgraph-base/trustgraph/base/flow_processor.py +++ b/trustgraph-base/trustgraph/base/flow_processor.py @@ -29,9 +29,9 @@ class FlowProcessor(AsyncProcessor): # Initialise base class super(FlowProcessor, self).__init__(**params) - # Register configuration handler + # Register configuration handler for this processor's config type self.register_config_handler( - self.on_configure_flows, types=["active-flow"] + self.on_configure_flows, types=[f"processor:{self.id}"] ) # Initialise flow information state @@ -66,17 +66,16 @@ class FlowProcessor(AsyncProcessor): logger.info(f"Got config version {version}") - # Skip over invalid data - if "active-flow" not in config: return - - # Check there's configuration information for me - if self.id in config["active-flow"]: - - # Get my flow config - flow_config = json.loads(config["active-flow"][self.id]) + config_type = f"processor:{self.id}" + # Get my flow config — each key is a variant, each value is + # the JSON config for that flow variant + if config_type in config: + flow_config = { + k: json.loads(v) + for k, v in config[config_type].items() + } else: - logger.debug("No configuration settings for me.") flow_config = {} diff --git a/trustgraph-base/trustgraph/base/parameter_spec.py b/trustgraph-base/trustgraph/base/parameter_spec.py index 4c6dfb21..7b3a3b26 100644 --- a/trustgraph-base/trustgraph/base/parameter_spec.py +++ b/trustgraph-base/trustgraph/base/parameter_spec.py @@ -18,6 +18,6 @@ class ParameterSpec(Spec): def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None: - value = definition.get(self.name, None) + value = definition.get("parameters", {}).get(self.name, None) flow.parameter[self.name] = Parameter(value) diff --git a/trustgraph-base/trustgraph/base/producer_spec.py b/trustgraph-base/trustgraph/base/producer_spec.py index 7e77ef35..16905f4b 100644 --- a/trustgraph-base/trustgraph/base/producer_spec.py +++ b/trustgraph-base/trustgraph/base/producer_spec.py @@ -19,7 +19,7 @@ class ProducerSpec(Spec): producer = Producer( backend = processor.pubsub, - topic = definition[self.name], + topic = definition["topics"][self.name], schema = self.schema, metrics = producer_metrics, ) diff --git a/trustgraph-base/trustgraph/base/pulsar_backend.py b/trustgraph-base/trustgraph/base/pulsar_backend.py index 6f125399..2100483d 100644 --- a/trustgraph-base/trustgraph/base/pulsar_backend.py +++ b/trustgraph-base/trustgraph/base/pulsar_backend.py @@ -266,6 +266,26 @@ class PulsarBackend: return PulsarBackendConsumer(pulsar_consumer, schema) + async def create_queue(self, topic: str, subscription: str) -> None: + """No-op — Pulsar auto-creates topics on first use. + TODO: Use admin REST API for explicit persistent topic creation.""" + pass + + async def delete_queue(self, topic: str, subscription: str) -> None: + """No-op — to be replaced with admin REST API calls. + TODO: Delete subscription and persistent topic via admin API.""" + pass + + async def queue_exists(self, topic: str, subscription: str) -> bool: + """Returns True — Pulsar auto-creates on subscribe. + TODO: Use admin REST API for actual existence check.""" + return True + + async def ensure_queue(self, topic: str, subscription: str) -> None: + """No-op — Pulsar auto-creates topics on first use. + TODO: Use admin REST API for explicit creation.""" + pass + def close(self) -> None: """Close the Pulsar client.""" self.client.close() diff --git a/trustgraph-base/trustgraph/base/rabbitmq_backend.py b/trustgraph-base/trustgraph/base/rabbitmq_backend.py index 7de51a0a..43c717c3 100644 --- a/trustgraph-base/trustgraph/base/rabbitmq_backend.py +++ b/trustgraph-base/trustgraph/base/rabbitmq_backend.py @@ -19,6 +19,7 @@ Uses basic_consume (push) instead of basic_get (polling) for efficient message delivery. """ +import asyncio import json import time import logging @@ -170,28 +171,37 @@ class RabbitMQBackendConsumer: self._connection = pika.BlockingConnection(self._connection_params) self._channel = self._connection.channel() - # Declare the topic exchange + # Declare the topic exchange (idempotent, also done by producers) self._channel.exchange_declare( exchange=self._exchange_name, exchange_type='topic', durable=True, ) - # Declare the queue — anonymous if exclusive - result = self._channel.queue_declare( - queue=self._queue_name, - durable=self._durable, - exclusive=self._exclusive, - auto_delete=self._auto_delete, - ) - # Capture actual name (important for anonymous queues where name='') - self._queue_name = result.method.queue + if self._exclusive: + # Anonymous ephemeral queue (response/notify class). + # These are per-consumer and must be created here — the + # broker assigns the name. + result = self._channel.queue_declare( + queue='', + durable=False, + exclusive=True, + auto_delete=True, + ) + self._queue_name = result.method.queue - self._channel.queue_bind( - queue=self._queue_name, - exchange=self._exchange_name, - routing_key=self._routing_key, - ) + self._channel.queue_bind( + queue=self._queue_name, + exchange=self._exchange_name, + routing_key=self._routing_key, + ) + else: + # Named queue (flow/request class). Queue must already + # exist — created by the flow service or ensure_queue. + # We just verify it exists and bind to consume. + self._channel.queue_declare( + queue=self._queue_name, passive=True, + ) self._channel.basic_qos(prefetch_count=1) @@ -409,5 +419,124 @@ class RabbitMQBackend: queue_name, schema, queue_durable, exclusive, auto_delete, ) + def _create_queue_sync(self, exchange, routing_key, queue_name, durable): + """Blocking queue creation — run via asyncio.to_thread.""" + connection = None + try: + connection = pika.BlockingConnection(self._connection_params) + channel = connection.channel() + channel.exchange_declare( + exchange=exchange, + exchange_type='topic', + durable=True, + ) + channel.queue_declare( + queue=queue_name, + durable=durable, + exclusive=False, + auto_delete=False, + ) + channel.queue_bind( + queue=queue_name, + exchange=exchange, + routing_key=routing_key, + ) + logger.info(f"Created queue: {queue_name}") + finally: + if connection and connection.is_open: + try: + connection.close() + except Exception: + pass + + async def create_queue(self, topic: str, subscription: str) -> None: + """Pre-create a named queue bound to the topic exchange. + + Only applies to shared queues (flow/request class). Response and + notify queues are anonymous/auto-delete and created by consumers. + """ + exchange, routing_key, cls, durable = self._parse_queue_id(topic) + + if cls in ('response', 'notify'): + return + + queue_name = f"{exchange}.{routing_key}.{subscription}" + await asyncio.to_thread( + self._create_queue_sync, exchange, routing_key, + queue_name, durable, + ) + + def _delete_queue_sync(self, queue_name): + """Blocking queue deletion — run via asyncio.to_thread.""" + connection = None + try: + connection = pika.BlockingConnection(self._connection_params) + channel = connection.channel() + channel.queue_delete(queue=queue_name) + logger.info(f"Deleted queue: {queue_name}") + except Exception as e: + # Idempotent — queue may already be gone + logger.debug(f"Queue delete for {queue_name}: {e}") + finally: + if connection and connection.is_open: + try: + connection.close() + except Exception: + pass + + async def delete_queue(self, topic: str, subscription: str) -> None: + """Delete a named queue and any messages it contains. + + Only applies to shared queues (flow/request class). Response and + notify queues are anonymous/auto-delete and managed by the broker. + """ + exchange, routing_key, cls, durable = self._parse_queue_id(topic) + + if cls in ('response', 'notify'): + return + + queue_name = f"{exchange}.{routing_key}.{subscription}" + await asyncio.to_thread(self._delete_queue_sync, queue_name) + + def _queue_exists_sync(self, queue_name): + """Blocking queue existence check — run via asyncio.to_thread. + Uses passive=True which checks without creating.""" + connection = None + try: + connection = pika.BlockingConnection(self._connection_params) + channel = connection.channel() + channel.queue_declare(queue=queue_name, passive=True) + return True + except pika.exceptions.ChannelClosedByBroker: + # 404 NOT_FOUND — queue does not exist + return False + finally: + if connection and connection.is_open: + try: + connection.close() + except Exception: + pass + + async def queue_exists(self, topic: str, subscription: str) -> bool: + """Check whether a named queue exists. + + Only applies to shared queues (flow/request class). Response and + notify queues are anonymous/ephemeral — always returns False. + """ + exchange, routing_key, cls, durable = self._parse_queue_id(topic) + + if cls in ('response', 'notify'): + return False + + queue_name = f"{exchange}.{routing_key}.{subscription}" + return await asyncio.to_thread( + self._queue_exists_sync, queue_name + ) + + async def ensure_queue(self, topic: str, subscription: str) -> None: + """Ensure a queue exists, creating it if necessary.""" + if not await self.queue_exists(topic, subscription): + await self.create_queue(topic, subscription) + def close(self) -> None: pass diff --git a/trustgraph-base/trustgraph/base/request_response_spec.py b/trustgraph-base/trustgraph/base/request_response_spec.py index d19aae10..b91c655c 100644 --- a/trustgraph-base/trustgraph/base/request_response_spec.py +++ b/trustgraph-base/trustgraph/base/request_response_spec.py @@ -137,10 +137,10 @@ class RequestResponseSpec(Spec): "--" + str(uuid.uuid4()) ), consumer_name = flow.id, - request_topic = definition[self.request_name], + request_topic = definition["topics"][self.request_name], request_schema = self.request_schema, request_metrics = request_metrics, - response_topic = definition[self.response_name], + response_topic = definition["topics"][self.response_name], response_schema = self.response_schema, response_metrics = response_metrics, ) diff --git a/trustgraph-base/trustgraph/base/subscriber_spec.py b/trustgraph-base/trustgraph/base/subscriber_spec.py index 39b852e5..bf35f869 100644 --- a/trustgraph-base/trustgraph/base/subscriber_spec.py +++ b/trustgraph-base/trustgraph/base/subscriber_spec.py @@ -20,7 +20,7 @@ class SubscriberSpec(Spec): subscriber = Subscriber( backend = processor.pubsub, - topic = definition[self.name], + topic = definition["topics"][self.name], subscription = flow.id, consumer_name = flow.id, schema = self.schema, diff --git a/trustgraph-cli/trustgraph/cli/show_flows.py b/trustgraph-cli/trustgraph/cli/show_flows.py index d1abf984..f7a14469 100644 --- a/trustgraph-cli/trustgraph/cli/show_flows.py +++ b/trustgraph-cli/trustgraph/cli/show_flows.py @@ -33,7 +33,7 @@ def describe_interfaces(intdefs, flow): lst.append(f"{k} response: {resp}") if kind == "send": - q = intfs[k] + q = intfs[k]["flow"] lst.append(f"{k}: {q}") diff --git a/trustgraph-flow/pyproject.toml b/trustgraph-flow/pyproject.toml index 14e919c0..492af385 100644 --- a/trustgraph-flow/pyproject.toml +++ b/trustgraph-flow/pyproject.toml @@ -61,6 +61,7 @@ api-gateway = "trustgraph.gateway:run" chunker-recursive = "trustgraph.chunking.recursive:run" chunker-token = "trustgraph.chunking.token:run" config-svc = "trustgraph.config.service:run" +flow-svc = "trustgraph.flow.service:run" doc-embeddings-query-milvus = "trustgraph.query.doc_embeddings.milvus:run" doc-embeddings-query-pinecone = "trustgraph.query.doc_embeddings.pinecone:run" doc-embeddings-query-qdrant = "trustgraph.query.doc_embeddings.qdrant:run" diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index 5c235bb2..75232315 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -11,14 +11,10 @@ from trustgraph.schema import ConfigRequest, ConfigResponse, ConfigPush from trustgraph.schema import config_request_queue, config_response_queue from trustgraph.schema import config_push_queue -from trustgraph.schema import FlowRequest, FlowResponse -from trustgraph.schema import flow_request_queue, flow_response_queue - from trustgraph.base import AsyncProcessor, Consumer, Producer from trustgraph.base.cassandra_config import add_cassandra_args, resolve_cassandra_config from . config import Configuration -from . flow import FlowConfig from ... base import ProcessorMetrics, ConsumerMetrics, ProducerMetrics from ... base import Consumer, Producer @@ -32,9 +28,6 @@ default_config_request_queue = config_request_queue default_config_response_queue = config_response_queue default_config_push_queue = config_push_queue -default_flow_request_queue = flow_request_queue -default_flow_response_queue = flow_response_queue - default_cassandra_host = "cassandra" class Processor(AsyncProcessor): @@ -51,13 +44,6 @@ class Processor(AsyncProcessor): "config_push_queue", default_config_push_queue ) - flow_request_queue = params.get( - "flow_request_queue", default_flow_request_queue - ) - flow_response_queue = params.get( - "flow_response_queue", default_flow_response_queue - ) - cassandra_host = params.get("cassandra_host") cassandra_username = params.get("cassandra_username") cassandra_password = params.get("cassandra_password") @@ -77,16 +63,11 @@ class Processor(AsyncProcessor): id = params.get("id") - flow_request_schema = FlowRequest - flow_response_schema = FlowResponse - super(Processor, self).__init__( **params | { "config_request_schema": ConfigRequest.__name__, "config_response_schema": ConfigResponse.__name__, "config_push_schema": ConfigPush.__name__, - "flow_request_schema": FlowRequest.__name__, - "flow_response_schema": FlowResponse.__name__, "cassandra_host": self.cassandra_host, "cassandra_username": self.cassandra_username, "cassandra_password": self.cassandra_password, @@ -103,12 +84,8 @@ class Processor(AsyncProcessor): processor = self.id, flow = None, name = "config-push" ) - flow_request_metrics = ConsumerMetrics( - processor = self.id, flow = None, name = "flow-request" - ) - flow_response_metrics = ProducerMetrics( - processor = self.id, flow = None, name = "flow-response" - ) + self.config_request_topic = config_request_queue + self.config_request_subscriber = id self.config_request_consumer = Consumer( taskgroup = self.taskgroup, @@ -135,24 +112,6 @@ class Processor(AsyncProcessor): metrics = config_push_metrics, ) - self.flow_request_consumer = Consumer( - taskgroup = self.taskgroup, - backend = self.pubsub, - flow = None, - topic = flow_request_queue, - subscriber = id, - schema = FlowRequest, - handler = self.on_flow_request, - metrics = flow_request_metrics, - ) - - self.flow_response_producer = Producer( - backend = self.pubsub, - topic = flow_response_queue, - schema = FlowResponse, - metrics = flow_response_metrics, - ) - self.config = Configuration( host = self.cassandra_host, username = self.cassandra_username, @@ -161,15 +120,15 @@ class Processor(AsyncProcessor): push = self.push ) - self.flow = FlowConfig(self.config) - logger.info("Config service initialized") async def start(self): + await self.pubsub.ensure_queue( + self.config_request_topic, self.config_request_subscriber + ) await self.push() # Startup poke: empty types = everything await self.config_request_consumer.start() - await self.flow_request_consumer.start() async def push(self, types=None): @@ -193,7 +152,7 @@ class Processor(AsyncProcessor): # Sender-produced ID id = msg.properties()["id"] - logger.info(f"Handling config request {id}...") + logger.debug(f"Handling config request {id}...") resp = await self.config.handle(v) @@ -214,36 +173,6 @@ class Processor(AsyncProcessor): resp, properties={"id": id} ) - async def on_flow_request(self, msg, consumer, flow): - - try: - - v = msg.value() - - # Sender-produced ID - id = msg.properties()["id"] - - logger.info(f"Handling flow request {id}...") - - resp = await self.flow.handle(v) - - await self.flow_response_producer.send( - resp, properties={"id": id} - ) - - except Exception as e: - - resp = FlowResponse( - error=Error( - type = "flow-error", - message = str(e), - ), - ) - - await self.flow_response_producer.send( - resp, properties={"id": id} - ) - @staticmethod def add_args(parser): @@ -263,18 +192,6 @@ class Processor(AsyncProcessor): # Note: --config-push-queue is already added by AsyncProcessor.add_args() - parser.add_argument( - '--flow-request-queue', - default=default_flow_request_queue, - help=f'Flow request queue (default: {default_flow_request_queue})' - ) - - parser.add_argument( - '--flow-response-queue', - default=default_flow_response_queue, - help=f'Flow response queue {default_flow_response_queue}', - ) - add_cassandra_args(parser) def run(): diff --git a/trustgraph-flow/trustgraph/cores/knowledge.py b/trustgraph-flow/trustgraph/cores/knowledge.py index 0d5c3d82..d03d4ed6 100644 --- a/trustgraph-flow/trustgraph/cores/knowledge.py +++ b/trustgraph-flow/trustgraph/cores/knowledge.py @@ -192,8 +192,8 @@ class KnowledgeManager: if "graph-embeddings-store" not in flow["interfaces"]: raise RuntimeError("Flow has no graph-embeddings-store") - t_q = flow["interfaces"]["triples-store"] - ge_q = flow["interfaces"]["graph-embeddings-store"] + t_q = flow["interfaces"]["triples-store"]["flow"] + ge_q = flow["interfaces"]["graph-embeddings-store"]["flow"] # Got this far, it should all work await respond( diff --git a/trustgraph-flow/trustgraph/cores/service.py b/trustgraph-flow/trustgraph/cores/service.py index d6390805..400f96d1 100755 --- a/trustgraph-flow/trustgraph/cores/service.py +++ b/trustgraph-flow/trustgraph/cores/service.py @@ -82,6 +82,9 @@ class Processor(AsyncProcessor): processor = self.id, flow = None, name = "knowledge-response" ) + self.knowledge_request_topic = knowledge_request_queue + self.knowledge_request_subscriber = id + self.knowledge_request_consumer = Consumer( taskgroup = self.taskgroup, backend = self.pubsub, @@ -116,6 +119,9 @@ class Processor(AsyncProcessor): async def start(self): + await self.pubsub.ensure_queue( + self.knowledge_request_topic, self.knowledge_request_subscriber + ) await super(Processor, self).start() await self.knowledge_request_consumer.start() await self.knowledge_response_producer.start() diff --git a/trustgraph-flow/trustgraph/flow/__init__.py b/trustgraph-flow/trustgraph/flow/__init__.py new file mode 100644 index 00000000..214f7272 --- /dev/null +++ b/trustgraph-flow/trustgraph/flow/__init__.py @@ -0,0 +1,2 @@ + +from . service import * diff --git a/trustgraph-flow/trustgraph/flow/service/__init__.py b/trustgraph-flow/trustgraph/flow/service/__init__.py new file mode 100644 index 00000000..214f7272 --- /dev/null +++ b/trustgraph-flow/trustgraph/flow/service/__init__.py @@ -0,0 +1,2 @@ + +from . service import * diff --git a/trustgraph-flow/trustgraph/flow/service/__main__.py b/trustgraph-flow/trustgraph/flow/service/__main__.py new file mode 100644 index 00000000..da5a9021 --- /dev/null +++ b/trustgraph-flow/trustgraph/flow/service/__main__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from . service import run + +if __name__ == '__main__': + run() diff --git a/trustgraph-flow/trustgraph/config/service/flow.py b/trustgraph-flow/trustgraph/flow/service/flow.py similarity index 57% rename from trustgraph-flow/trustgraph/config/service/flow.py rename to trustgraph-flow/trustgraph/flow/service/flow.py index 775c8b4e..477c6a2c 100644 --- a/trustgraph-flow/trustgraph/config/service/flow.py +++ b/trustgraph-flow/trustgraph/flow/service/flow.py @@ -1,15 +1,22 @@ from trustgraph.schema import FlowResponse, Error +import asyncio import json import logging # Module logger logger = logging.getLogger(__name__) +# Queue deletion retry settings +DELETE_RETRIES = 5 +DELETE_RETRY_DELAY = 2 # seconds + + class FlowConfig: - def __init__(self, config): + def __init__(self, config, pubsub): self.config = config + self.pubsub = pubsub # Cache for parameter type definitions to avoid repeated lookups self.param_type_cache = {} @@ -22,9 +29,12 @@ class FlowConfig: user_params: User-provided parameters dict (may be None or empty) Returns: - Complete parameter dict with user values and defaults merged (all values as strings) + Complete parameter dict with user values and defaults merged + (all values as strings) """ + # If the flow blueprint has no parameters section, return user params as-is (stringified) + if "parameters" not in flow_blueprint: if not user_params: return {} @@ -49,7 +59,9 @@ class FlowConfig: if param_type not in self.param_type_cache: try: # Fetch parameter type definition from config store - type_def = await self.config.get("parameter-type").get(param_type) + type_def = await self.config.get( + "parameter-type", param_type + ) if type_def: self.param_type_cache[param_type] = json.loads(type_def) else: @@ -102,32 +114,29 @@ class FlowConfig: async def handle_list_blueprints(self, msg): - names = list(await self.config.get("flow-blueprint").keys()) + names = list(await self.config.keys("flow-blueprint")) return FlowResponse( error = None, blueprint_names = names, ) - + async def handle_get_blueprint(self, msg): return FlowResponse( error = None, blueprint_definition = await self.config.get( - "flow-blueprint" - ).get(msg.blueprint_name), + "flow-blueprint", msg.blueprint_name + ), ) - + async def handle_put_blueprint(self, msg): - await self.config.get("flow-blueprint").put( + await self.config.put( + "flow-blueprint", msg.blueprint_name, msg.blueprint_definition ) - await self.config.inc_version() - - await self.config.push(types=["flow-blueprint"]) - return FlowResponse( error = None, ) @@ -136,28 +145,24 @@ class FlowConfig: logger.debug(f"Flow config message: {msg}") - await self.config.get("flow-blueprint").delete(msg.blueprint_name) - - await self.config.inc_version() - - await self.config.push(types=["flow-blueprint"]) + await self.config.delete("flow-blueprint", msg.blueprint_name) return FlowResponse( error = None, ) - + async def handle_list_flows(self, msg): - names = list(await self.config.get("flow").keys()) + names = list(await self.config.keys("flow")) return FlowResponse( error = None, flow_ids = names, ) - + async def handle_get_flow(self, msg): - flow_data = await self.config.get("flow").get(msg.flow_id) + flow_data = await self.config.get("flow", msg.flow_id) flow = json.loads(flow_data) return FlowResponse( @@ -166,7 +171,7 @@ class FlowConfig: description = flow.get("description", ""), parameters = flow.get("parameters", {}), ) - + async def handle_start_flow(self, msg): if msg.blueprint_name is None: @@ -175,17 +180,17 @@ class FlowConfig: if msg.flow_id is None: raise RuntimeError("No flow ID") - if msg.flow_id in await self.config.get("flow").keys(): + if msg.flow_id in await self.config.keys("flow"): raise RuntimeError("Flow already exists") if msg.description is None: raise RuntimeError("No description") - if msg.blueprint_name not in await self.config.get("flow-blueprint").keys(): + if msg.blueprint_name not in await self.config.keys("flow-blueprint"): raise RuntimeError("Blueprint does not exist") cls = json.loads( - await self.config.get("flow-blueprint").get(msg.blueprint_name) + await self.config.get("flow-blueprint", msg.blueprint_name) ) # Resolve parameters by merging user-provided values with defaults @@ -210,6 +215,15 @@ class FlowConfig: return result + # Pre-create flow-level queues so the data path is wired + # before processors receive their config and start connecting. + queues = self._collect_flow_queues(cls, repl_template_with_params) + for topic, subscription in queues: + await self.pubsub.create_queue(topic, subscription) + + # Build all processor config updates, then write in a single batch. + updates = [] + for kind in ("blueprint", "flow"): for k, v in cls[kind].items(): @@ -218,37 +232,34 @@ class FlowConfig: variant = repl_template_with_params(variant) - v = { + topics = { repl_template_with_params(k2): repl_template_with_params(v2) - for k2, v2 in v.items() + for k2, v2 in v.get("topics", {}).items() } - flac = await self.config.get("active-flow").get(processor) - if flac is not None: - target = json.loads(flac) - else: - target = {} + params = { + repl_template_with_params(k2): repl_template_with_params(v2) + for k2, v2 in v.get("parameters", {}).items() + } - # The condition if variant not in target: means it only adds - # the configuration if the variant doesn't already exist. - # If "everything" already exists in the target with old - # values, they won't update. + entry = { + "topics": topics, + "parameters": params, + } - if variant not in target: - target[variant] = v + updates.append(( + f"processor:{processor}", + variant, + json.dumps(entry), + )) - await self.config.get("active-flow").put( - processor, json.dumps(target) - ) + await self.config.put_many(updates) def repl_interface(i): - if isinstance(i, str): - return repl_template_with_params(i) - else: - return { - k: repl_template_with_params(v) - for k, v in i.items() - } + return { + k: repl_template_with_params(v) + for k, v in i.items() + } if "interfaces" in cls: interfaces = { @@ -258,8 +269,8 @@ class FlowConfig: else: interfaces = {} - await self.config.get("flow").put( - msg.flow_id, + await self.config.put( + "flow", msg.flow_id, json.dumps({ "description": msg.description, "blueprint-name": msg.blueprint_name, @@ -268,23 +279,131 @@ class FlowConfig: }) ) - await self.config.inc_version() - - await self.config.push(types=["active-flow", "flow"]) - return FlowResponse( error = None, ) - + + async def ensure_existing_flow_queues(self): + """Ensure queues exist for all already-running flows. + + Called on startup to handle flows that were started before this + version of the flow service was deployed, or before a restart. + """ + flow_ids = await self.config.keys("flow") + + for flow_id in flow_ids: + try: + flow_data = await self.config.get("flow", flow_id) + if flow_data is None: + continue + + flow = json.loads(flow_data) + + blueprint_name = flow.get("blueprint-name") + if blueprint_name is None: + continue + + # Skip flows that are mid-shutdown + if flow.get("status") == "stopping": + continue + + parameters = flow.get("parameters", {}) + + blueprint_data = await self.config.get( + "flow-blueprint", blueprint_name + ) + if blueprint_data is None: + logger.warning( + f"Blueprint '{blueprint_name}' not found for " + f"flow '{flow_id}', skipping queue creation" + ) + continue + + cls = json.loads(blueprint_data) + + def repl_template(tmp): + result = tmp.replace( + "{blueprint}", blueprint_name + ).replace( + "{id}", flow_id + ) + for param_name, param_value in parameters.items(): + result = result.replace( + f"{{{param_name}}}", str(param_value) + ) + return result + + queues = self._collect_flow_queues(cls, repl_template) + for topic, subscription in queues: + await self.pubsub.ensure_queue(topic, subscription) + + logger.info( + f"Ensured queues for existing flow '{flow_id}'" + ) + + except Exception as e: + logger.error( + f"Failed to ensure queues for flow '{flow_id}': {e}" + ) + + def _collect_flow_queues(self, cls, repl_template): + """Collect (topic, subscription) pairs for all flow-level queues. + + Iterates the blueprint's "flow" section and reads only the + "topics" dict from each processor entry. + """ + queues = [] + + for k, v in cls["flow"].items(): + processor, variant = k.split(":", 1) + variant = repl_template(variant) + + for spec_name, topic_template in v.get("topics", {}).items(): + topic = repl_template(topic_template) + subscription = f"{processor}--{variant}--{spec_name}" + queues.append((topic, subscription)) + + return queues + + async def _delete_queues(self, queues): + """Delete queues with retries. Best-effort — logs failures but + does not raise.""" + for attempt in range(DELETE_RETRIES): + remaining = [] + + for topic, subscription in queues: + try: + await self.pubsub.delete_queue(topic, subscription) + except Exception as e: + logger.warning( + f"Queue delete failed (attempt {attempt + 1}/" + f"{DELETE_RETRIES}): {topic}: {e}" + ) + remaining.append((topic, subscription)) + + if not remaining: + return + + queues = remaining + + if attempt < DELETE_RETRIES - 1: + await asyncio.sleep(DELETE_RETRY_DELAY) + + for topic, subscription in queues: + logger.error( + f"Failed to delete queue after {DELETE_RETRIES} " + f"attempts: {topic}" + ) + async def handle_stop_flow(self, msg): if msg.flow_id is None: raise RuntimeError("No flow ID") - if msg.flow_id not in await self.config.get("flow").keys(): + if msg.flow_id not in await self.config.keys("flow"): raise RuntimeError("Flow ID invalid") - flow = json.loads(await self.config.get("flow").get(msg.flow_id)) + flow = json.loads(await self.config.get("flow", msg.flow_id)) if "blueprint-name" not in flow: raise RuntimeError("Internal error: flow has no flow blueprint") @@ -292,7 +411,9 @@ class FlowConfig: blueprint_name = flow["blueprint-name"] parameters = flow.get("parameters", {}) - cls = json.loads(await self.config.get("flow-blueprint").get(blueprint_name)) + cls = json.loads( + await self.config.get("flow-blueprint", blueprint_name) + ) def repl_template(tmp): result = tmp.replace( @@ -305,34 +426,33 @@ class FlowConfig: result = result.replace(f"{{{param_name}}}", str(param_value)) return result - for kind in ("flow",): + # Collect queue identifiers before removing config + queues = self._collect_flow_queues(cls, repl_template) - for k, v in cls[kind].items(): + # Phase 1: Set status to "stopping" and remove processor config. + # The config push tells processors to shut down their consumers. + flow["status"] = "stopping" + await self.config.put( + "flow", msg.flow_id, json.dumps(flow) + ) - processor, variant = k.split(":", 1) + # Delete all processor config entries for this flow. + deletes = [] - variant = repl_template(variant) + for k, v in cls["flow"].items(): - flac = await self.config.get("active-flow").get(processor) + processor, variant = k.split(":", 1) + variant = repl_template(variant) - if flac is not None: - target = json.loads(flac) - else: - target = {} + deletes.append((f"processor:{processor}", variant)) - if variant in target: - del target[variant] + await self.config.delete_many(deletes) - await self.config.get("active-flow").put( - processor, json.dumps(target) - ) + # Phase 2: Delete queues with retries, then remove the flow record. + await self._delete_queues(queues) - if msg.flow_id in await self.config.get("flow").keys(): - await self.config.get("flow").delete(msg.flow_id) - - await self.config.inc_version() - - await self.config.push(types=["active-flow", "flow"]) + if msg.flow_id in await self.config.keys("flow"): + await self.config.delete("flow", msg.flow_id) return FlowResponse( error = None, @@ -368,4 +488,3 @@ class FlowConfig: ) return resp - diff --git a/trustgraph-flow/trustgraph/flow/service/service.py b/trustgraph-flow/trustgraph/flow/service/service.py new file mode 100644 index 00000000..a3f2fb6b --- /dev/null +++ b/trustgraph-flow/trustgraph/flow/service/service.py @@ -0,0 +1,162 @@ + +""" +Flow service. Manages flow lifecycle — starting and stopping flows +by coordinating with the config service via pub/sub. +""" + +import logging + +from trustgraph.schema import Error + +from trustgraph.schema import FlowRequest, FlowResponse +from trustgraph.schema import flow_request_queue, flow_response_queue +from trustgraph.schema import ConfigRequest, ConfigResponse +from trustgraph.schema import config_request_queue, config_response_queue + +from trustgraph.base import AsyncProcessor, Consumer, Producer +from trustgraph.base import ConsumerMetrics, ProducerMetrics, SubscriberMetrics +from trustgraph.base import ConfigClient + +from . flow import FlowConfig + +# Module logger +logger = logging.getLogger(__name__) + +default_ident = "flow-svc" + +default_flow_request_queue = flow_request_queue +default_flow_response_queue = flow_response_queue + + +class Processor(AsyncProcessor): + + def __init__(self, **params): + + flow_request_queue = params.get( + "flow_request_queue", default_flow_request_queue + ) + flow_response_queue = params.get( + "flow_response_queue", default_flow_response_queue + ) + + id = params.get("id") + + super(Processor, self).__init__( + **params | { + "flow_request_schema": FlowRequest.__name__, + "flow_response_schema": FlowResponse.__name__, + } + ) + + flow_request_metrics = ConsumerMetrics( + processor = self.id, flow = None, name = "flow-request" + ) + flow_response_metrics = ProducerMetrics( + processor = self.id, flow = None, name = "flow-response" + ) + + self.flow_request_topic = flow_request_queue + self.flow_request_subscriber = id + + self.flow_request_consumer = Consumer( + taskgroup = self.taskgroup, + backend = self.pubsub, + flow = None, + topic = flow_request_queue, + subscriber = id, + schema = FlowRequest, + handler = self.on_flow_request, + metrics = flow_request_metrics, + ) + + self.flow_response_producer = Producer( + backend = self.pubsub, + topic = flow_response_queue, + schema = FlowResponse, + metrics = flow_response_metrics, + ) + + config_req_metrics = ProducerMetrics( + processor=self.id, flow=None, name="config-request", + ) + config_resp_metrics = SubscriberMetrics( + processor=self.id, flow=None, name="config-response", + ) + + self.config_client = ConfigClient( + backend=self.pubsub, + subscription=f"{self.id}--config--{id}", + consumer_name=self.id, + request_topic=config_request_queue, + request_schema=ConfigRequest, + request_metrics=config_req_metrics, + response_topic=config_response_queue, + response_schema=ConfigResponse, + response_metrics=config_resp_metrics, + ) + + self.flow = FlowConfig(self.config_client, self.pubsub) + + logger.info("Flow service initialized") + + async def start(self): + + await self.pubsub.ensure_queue( + self.flow_request_topic, self.flow_request_subscriber + ) + await self.config_client.start() + await self.flow.ensure_existing_flow_queues() + await self.flow_request_consumer.start() + + async def on_flow_request(self, msg, consumer, flow): + + try: + + v = msg.value() + + # Sender-produced ID + id = msg.properties()["id"] + + logger.debug(f"Handling flow request {id}...") + + resp = await self.flow.handle(v) + + await self.flow_response_producer.send( + resp, properties={"id": id} + ) + + except Exception as e: + + logger.error(f"Flow request failed: {e}") + + resp = FlowResponse( + error=Error( + type = "flow-error", + message = str(e), + ), + ) + + await self.flow_response_producer.send( + resp, properties={"id": id} + ) + + @staticmethod + def add_args(parser): + + AsyncProcessor.add_args(parser) + + parser.add_argument( + '--flow-request-queue', + default=default_flow_request_queue, + help=f'Flow request queue (default: {default_flow_request_queue})' + ) + + parser.add_argument( + '--flow-response-queue', + default=default_flow_response_queue, + help=f'Flow response queue {default_flow_response_queue}', + ) + +def run(): + + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/gateway/config/receiver.py b/trustgraph-flow/trustgraph/gateway/config/receiver.py index 2323cd61..c721a46a 100755 --- a/trustgraph-flow/trustgraph/gateway/config/receiver.py +++ b/trustgraph-flow/trustgraph/gateway/config/receiver.py @@ -54,7 +54,7 @@ class ConfigReceiver: return # Gateway cares about flow config - if notify_types and "flow" not in notify_types and "active-flow" not in notify_types: + if notify_types and "flow" not in notify_types: logger.debug( f"Ignoring config notify v{notify_version}, " f"no flow types in {notify_types}" diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index ef3d5507..592120b1 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -226,7 +226,7 @@ class DispatcherManager: raise RuntimeError("This kind not supported by flow") # FIXME: The -store bit, does it make sense? - qconfig = intf_defs[int_kind] + qconfig = intf_defs[int_kind]["flow"] id = str(uuid.uuid4()) dispatcher = import_dispatchers[kind]( @@ -264,7 +264,7 @@ class DispatcherManager: if int_kind not in intf_defs: raise RuntimeError("This kind not supported by flow") - qconfig = intf_defs[int_kind] + qconfig = intf_defs[int_kind]["flow"] id = str(uuid.uuid4()) dispatcher = export_dispatchers[kind]( @@ -320,7 +320,7 @@ class DispatcherManager: elif kind in sender_dispatchers: dispatcher = sender_dispatchers[kind]( backend = self.backend, - queue = qconfig, + queue = qconfig["flow"], ) else: raise RuntimeError("Invalid kind") diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index c735a550..83f97bf3 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -162,6 +162,9 @@ class Processor(AsyncProcessor): processor = self.id, flow = None, name = "storage-response" ) + self.librarian_request_topic = librarian_request_queue + self.librarian_request_subscriber = id + self.librarian_request_consumer = Consumer( taskgroup = self.taskgroup, backend = self.pubsub, @@ -180,6 +183,9 @@ class Processor(AsyncProcessor): metrics = librarian_response_metrics, ) + self.collection_request_topic = collection_request_queue + self.collection_request_subscriber = id + self.collection_request_consumer = Consumer( taskgroup = self.taskgroup, backend = self.pubsub, @@ -248,7 +254,7 @@ class Processor(AsyncProcessor): self.register_config_handler( self.on_librarian_config, - types=["flow", "active-flow"], + types=["flow"], ) self.flows = {} @@ -257,6 +263,12 @@ class Processor(AsyncProcessor): async def start(self): + await self.pubsub.ensure_queue( + self.librarian_request_topic, self.librarian_request_subscriber + ) + await self.pubsub.ensure_queue( + self.collection_request_topic, self.collection_request_subscriber + ) await super(Processor, self).start() await self.librarian_request_consumer.start() await self.librarian_response_producer.start() @@ -365,12 +377,12 @@ class Processor(AsyncProcessor): else: kind = "document-load" - q = flow["interfaces"][kind] + q = flow["interfaces"][kind]["flow"] # Emit document provenance to knowledge graph if "triples-store" in flow["interfaces"]: await self.emit_document_provenance( - document, processing, flow["interfaces"]["triples-store"] + document, processing, flow["interfaces"]["triples-store"]["flow"] ) if kind == "text-load":