mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-25 00:16:23 +02:00
Release/v2.3 -> master
This commit is contained in:
parent
59e269185d
commit
e8bc96ef7e
31 changed files with 1202 additions and 398 deletions
67
docs/tech-specs/active-flow-key-restructure.md
Normal file
67
docs/tech-specs/active-flow-key-restructure.md
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
---
|
||||
layout: default
|
||||
title: "Active-Flow Key Restructure"
|
||||
parent: "Tech Specs"
|
||||
---
|
||||
|
||||
# Active-Flow Key Restructure
|
||||
|
||||
## Problem
|
||||
|
||||
Active-flow config uses `('active-flow', processor)` as its key, where
|
||||
each processor's value is a JSON blob containing all flow variants
|
||||
assigned to that processor:
|
||||
|
||||
```
|
||||
('active-flow', 'chunker') -> { "default": {...}, "flow2": {...} }
|
||||
```
|
||||
|
||||
This causes two problems:
|
||||
|
||||
1. **Read-modify-write on every change.** Starting or stopping a flow
|
||||
requires fetching the processor's current blob, parsing it, adding
|
||||
or removing a variant, serialising it, and writing it back. This is
|
||||
a concurrency hazard if two flow operations target the same
|
||||
processor simultaneously.
|
||||
|
||||
2. **Noisy config pushes.** Config subscribers subscribe to a type,
|
||||
not a specific key. Every active-flow write triggers a config push
|
||||
that causes every processor in the system to fetch the full config
|
||||
and re-evaluate, even though only one processor's config changed.
|
||||
With N processors in a blueprint, a single flow start/stop causes
|
||||
N writes and N^2 config fetches across the system.
|
||||
|
||||
## Proposed Change
|
||||
|
||||
Restructure the key to `('active-flow', 'processor:variant')` where
|
||||
each key holds a single flow variant's configuration:
|
||||
|
||||
```
|
||||
('active-flow', 'chunker:default') -> { "topics": {...}, "parameters": {...} }
|
||||
('active-flow', 'chunker:flow2') -> { "topics": {...}, "parameters": {...} }
|
||||
```
|
||||
|
||||
Starting a flow is a set of clean puts. Stopping a flow is a set of
|
||||
clean deletes. No read-modify-write. No JSON blob merging.
|
||||
|
||||
The config push problem (all processors fetching on every change)
|
||||
remains — that's a limitation of the config subscription model and
|
||||
would require per-key subscriptions to solve. But eliminating the
|
||||
read-modify-write removes the concurrency hazard and simplifies the
|
||||
flow service code.
|
||||
|
||||
## What Changes
|
||||
|
||||
- **Flow service** (`flow.py`): `handle_start_flow` writes individual
|
||||
keys per processor:variant instead of merging into per-processor
|
||||
blobs. `handle_stop_flow` deletes individual keys instead of
|
||||
read-modify-write.
|
||||
- **FlowProcessor** (`flow_processor.py`): `on_configure_flows`
|
||||
currently looks up `config["active-flow"][self.id]` to find a JSON
|
||||
blob of all its variants. Needs to scan all active-flow keys for
|
||||
entries prefixed with `self.id:` and assemble its flow list from
|
||||
those.
|
||||
- **Config client**: May benefit from a prefix-scan or pattern-match
|
||||
query to support the FlowProcessor lookup efficiently.
|
||||
- **Initial config / bootstrapping**: Any code that seeds active-flow
|
||||
entries at deployment time needs to use the new key format.
|
||||
299
docs/tech-specs/flow-service-queue-lifecycle.md
Normal file
299
docs/tech-specs/flow-service-queue-lifecycle.md
Normal file
|
|
@ -0,0 +1,299 @@
|
|||
---
|
||||
layout: default
|
||||
title: "Flow Service Separation and Queue Lifecycle Management"
|
||||
parent: "Tech Specs"
|
||||
---
|
||||
|
||||
# Flow Service Separation and Queue Lifecycle Management
|
||||
|
||||
## Overview
|
||||
|
||||
This specification describes the separation of the flow service from the
|
||||
config service into an independent service, and the addition of explicit
|
||||
queue lifecycle management to the pub/sub backend abstraction.
|
||||
|
||||
Every queue in the system has an explicit owner responsible for its
|
||||
creation and deletion:
|
||||
|
||||
- **Flow and blueprint queues** — owned by the flow service
|
||||
- **System queues** (config, librarian, knowledge, etc.) — owned by
|
||||
the services themselves
|
||||
|
||||
Consumers never create queues. They connect to queues that already
|
||||
exist.
|
||||
|
||||
This addresses a fundamental problem across broker backends: without an
|
||||
authoritative lifecycle owner, queues are created as a side effect of
|
||||
consumer connections and never explicitly deleted. In RabbitMQ, this
|
||||
leads to orphaned durable queues. In Pulsar, persistent topics and
|
||||
subscriptions survive consumer crashes. In Kafka, topics persist
|
||||
indefinitely. The solution is the same for all backends: explicit
|
||||
lifecycle management through the `PubSubBackend` protocol.
|
||||
|
||||
---
|
||||
|
||||
## Background
|
||||
|
||||
### Current Architecture
|
||||
|
||||
The flow service (`FlowConfig`) and config service (`Configuration`)
|
||||
are co-located in a single process: `trustgraph-flow/config/service`.
|
||||
They share a `Processor` class that inherits from `AsyncProcessor` and
|
||||
manages both config and flow request/response queues. `FlowConfig`
|
||||
receives a direct reference to the `Configuration` object, giving it
|
||||
backdoor access to `inc_version()` and `push()` — methods that bypass
|
||||
the config service's own pub/sub interface.
|
||||
|
||||
The flow service manages flow lifecycle (start/stop) by manipulating
|
||||
config state — active-flow entries, flow records, blueprint lookups —
|
||||
but takes no active part in broker queue management. Queues are created
|
||||
implicitly when the first consumer connects and are never explicitly
|
||||
deleted.
|
||||
|
||||
### The Queue Lifecycle Problem
|
||||
|
||||
Queues are currently created as a side effect of consumer connections
|
||||
(in `_connect()` for RabbitMQ, in `subscribe()` for Pulsar). No single
|
||||
component owns queue lifecycle, leading to two failure modes:
|
||||
|
||||
- **Orphaned queues**: When a flow is stopped, consumers shut down but
|
||||
their queues remain — along with any messages in them. In RabbitMQ,
|
||||
durable queues persist indefinitely. In Pulsar, persistent topics and
|
||||
their subscriptions survive consumer disconnection unless
|
||||
`unsubscribe()` is explicitly called — which doesn't happen on crash
|
||||
or error paths.
|
||||
- **Premature deletion**: If consumers attempt to delete queues on
|
||||
exit, error-path shutdowns destroy queues that other consumers or the
|
||||
system still need.
|
||||
|
||||
Neither strategy is reliable because neither the consumer nor the
|
||||
broker knows whether a queue should exist — only the flow manager
|
||||
knows that.
|
||||
|
||||
### Why Separation
|
||||
|
||||
The flow service currently piggybacks on the config service process.
|
||||
Adding broker queue management to the flow service introduces operations
|
||||
that may have significant latency — RabbitMQ queue operations are
|
||||
generally fast, but Kafka topic creation can involve partition
|
||||
assignment, replication, and leader election delays.
|
||||
|
||||
The config service is on the critical path for every service in the
|
||||
system — all services read configuration on startup and respond to
|
||||
config pushes. Blocking the config service's request/response loop
|
||||
while waiting for broker operations risks cascading latency across the
|
||||
entire deployment.
|
||||
|
||||
Separating the flow service into its own process gives it an
|
||||
independent latency budget. A slow `start-flow` (waiting for queue
|
||||
creation across multiple brokers) does not affect config reads.
|
||||
Additionally, the flow service's direct access to the `Configuration`
|
||||
object is a coupling that masks what should be a clean client
|
||||
relationship — the flow service only needs to read and write config
|
||||
entries, which is exactly what the existing config client provides.
|
||||
|
||||
---
|
||||
|
||||
## Design
|
||||
|
||||
### Queue Ownership Model
|
||||
|
||||
Every queue in the system has exactly one owner responsible for its
|
||||
creation and deletion:
|
||||
|
||||
| Queue type | Owner | Created when | Deleted when |
|
||||
|---|---|---|---|
|
||||
| Flow queues | Flow service | `start-flow` | `stop-flow` |
|
||||
| Blueprint queues | Flow service | `start-flow` (idempotent) | Never (shared across flow instances) |
|
||||
| System queues (config, librarian, knowledge, etc.) | Each service | Service startup | Never (system lifetime) |
|
||||
|
||||
Consumers never create queues. The consumer's `_connect()` method
|
||||
connects to a queue that must already exist — it does not declare or
|
||||
create it.
|
||||
|
||||
### Flow Service as Independent Service
|
||||
|
||||
The flow service becomes its own `Processor(AsyncProcessor)` in a
|
||||
separate module and process. It:
|
||||
|
||||
- Listens on the existing flow request/response queues (already distinct
|
||||
from config queues — no consumer migration needed).
|
||||
- Uses the async `ConfigClient` (extending `RequestResponse`) to
|
||||
read/write config state (blueprints, active-flow entries, flow
|
||||
records). Config pushes are triggered automatically by config
|
||||
writes — the backdoor `inc_version()` and `push()` calls are no
|
||||
longer needed.
|
||||
- Has direct access to the pub/sub backend (inherited from
|
||||
`AsyncProcessor`) for queue lifecycle operations.
|
||||
|
||||
The config service (`trustgraph-flow/config/service`) is simplified:
|
||||
the flow consumer, flow producer, and `FlowConfig` class are removed.
|
||||
It returns to being purely a config service.
|
||||
|
||||
### Queue Lifecycle in the Pub/Sub Backend
|
||||
|
||||
The `PubSubBackend` protocol gains queue management methods. All new
|
||||
methods are async — backends that use blocking I/O (e.g., pika for
|
||||
RabbitMQ) handle threading internally.
|
||||
|
||||
```
|
||||
PubSubBackend:
|
||||
create_producer(...) # existing
|
||||
create_consumer(...) # existing
|
||||
close() # existing
|
||||
async create_queue(topic, subscription) # new
|
||||
async delete_queue(topic, subscription) # new
|
||||
async queue_exists(topic, subscription) # new
|
||||
async ensure_queue(topic, subscription) # new
|
||||
```
|
||||
|
||||
- `create_queue` — create a queue. Idempotent if queue already exists
|
||||
with the same properties. Fails if properties mismatch.
|
||||
- `delete_queue` — delete a queue and its messages. Idempotent if
|
||||
queue does not exist.
|
||||
- `queue_exists` — check whether a queue exists. Returns bool.
|
||||
- `ensure_queue` — create-if-not-exists convenience wrapper.
|
||||
|
||||
The `topic` and `subscription` parameters together identify the queue,
|
||||
mirroring `create_consumer` where the queue name is derived from both.
|
||||
|
||||
Backend implementations:
|
||||
|
||||
- **RabbitMQ**: `queue_declare`, `queue_delete`, and
|
||||
`queue_declare(passive=True)` via pika. Blocking calls wrapped in
|
||||
`asyncio.to_thread` inside the backend. Queue name derived using the
|
||||
existing `_parse_queue_id` logic.
|
||||
- **Pulsar**: REST calls to the Pulsar admin API (port 8080).
|
||||
Create/delete persistent topics, delete subscriptions. Requires admin
|
||||
URL as additional configuration alongside the broker URL.
|
||||
- **Kafka** (future): `AdminClient.create_topics()` and
|
||||
`AdminClient.delete_topics()` from the `confluent-kafka` library.
|
||||
Uses the same bootstrap servers as the broker connection.
|
||||
|
||||
### Flow Start: Queue Creation
|
||||
|
||||
When `handle_start_flow` processes a flow start request, after
|
||||
resolving parameters and computing the template-substituted topic
|
||||
identifiers, it creates queues before writing config state.
|
||||
|
||||
Queues are created for both `cls["blueprint"]` and `cls["flow"]`
|
||||
entries. Blueprint queue creation is idempotent — multiple flows
|
||||
creating the same blueprint queue is safe.
|
||||
|
||||
The flow start request returns only after queues are confirmed ready.
|
||||
This gives callers a hard guarantee: when `start-flow` succeeds, the
|
||||
data path is fully wired.
|
||||
|
||||
### Flow Stop: Two-Phase Shutdown
|
||||
|
||||
Stopping a flow is a two-phase transaction with a retry window between
|
||||
them.
|
||||
|
||||
**Phase 1 — Signal processors to shut down:**
|
||||
|
||||
1. Set the flow record's status to `"stopping"`. This marks the flow
|
||||
as in-progress so that if the flow service crashes mid-stop, it can
|
||||
identify and resume incomplete shutdowns on restart.
|
||||
2. Remove the flow's variants from each processor's `active-flow`
|
||||
config entries.
|
||||
3. Config push fires automatically. Each `FlowProcessor` receives the
|
||||
update, compares wanted vs current flows, and calls `stop_flow` on
|
||||
flows no longer wanted — closing consumers and producers.
|
||||
|
||||
**Phase 2 — Delete queues with retries, then finalise:**
|
||||
|
||||
1. Retry queue deletion with delays, giving processors time to react
|
||||
to the config change and disconnect. Queue deletion is idempotent —
|
||||
if a queue was already removed by a previous attempt or was never
|
||||
created, the operation succeeds silently. Only `cls["flow"]` entries
|
||||
(per-flow-instance queues) are deleted — `cls["blueprint"]` entries
|
||||
are shared infrastructure and are not touched.
|
||||
2. Delete the `flow` record from config.
|
||||
|
||||
The flow service retries persistently. A queue that cannot be deleted
|
||||
after retries is logged as an error but does not block the stop
|
||||
transaction from completing — a leaked queue is less harmful than a
|
||||
flow that cannot be stopped.
|
||||
|
||||
**Crash recovery:** On startup, the flow service scans for flow
|
||||
records with `"status": "stopping"`. These represent incomplete
|
||||
shutdowns from a previous run. For each, it resumes from the
|
||||
appropriate point — if active-flow entries are already cleared, it
|
||||
proceeds directly to phase 2 (queue deletion and flow record cleanup).
|
||||
|
||||
### System Service Queues
|
||||
|
||||
System services (config, librarian, knowledge, etc.) are not managed
|
||||
by the flow service. Each service calls `ensure_queue` for its own
|
||||
queues during startup. These queues persist for the lifetime of the
|
||||
system and are never explicitly deleted.
|
||||
|
||||
### Consumer Connection
|
||||
|
||||
Consumers never create queues. The consumer connects to a queue that
|
||||
must already exist — created either by the flow service (for flow and
|
||||
blueprint queues) or by the service itself (for system queues).
|
||||
|
||||
For RabbitMQ, this means `_connect()` no longer calls `queue_declare`.
|
||||
It connects to a queue by name and fails if the queue does not exist.
|
||||
|
||||
For Pulsar, `subscribe()` inherently creates a subscription. This is
|
||||
how Pulsar works and does not conflict with the lifecycle model —
|
||||
Pulsar's broker manages subscription state, and the flow service uses
|
||||
the admin API for explicit cleanup.
|
||||
|
||||
---
|
||||
|
||||
## Operational Impact
|
||||
|
||||
### Deployment
|
||||
|
||||
The flow service is a new container/process alongside the existing
|
||||
config service. It requires:
|
||||
|
||||
- Access to the message broker (same credentials as other services —
|
||||
inherited from `AsyncProcessor` via standard CLI args).
|
||||
- Access to the config service via pub/sub (config request/response
|
||||
queues — same as any other service that reads config).
|
||||
- For Pulsar: the admin API URL (separate from the broker URL).
|
||||
|
||||
It does not require direct Cassandra access.
|
||||
|
||||
### Backward Compatibility
|
||||
|
||||
- The flow request/response queue interface is unchanged — API gateway
|
||||
and CLI tools that send flow requests continue to work without
|
||||
modification.
|
||||
- The config service loses its flow handling capability, so both
|
||||
services must be deployed together. This is a breaking change in
|
||||
deployment topology but not in API.
|
||||
- Queue deletion on flow stop is new behaviour. Existing deployments
|
||||
that rely on queues persisting after flow stop (e.g. for post-mortem
|
||||
message inspection) would need to drain queues before stopping flows.
|
||||
|
||||
---
|
||||
|
||||
## Assumptions
|
||||
|
||||
- **The flow service is the sole writer of flow configuration.** The
|
||||
two-phase stop transaction relies on the flow record's `"stopping"`
|
||||
status being authoritative — no other service or process modifies
|
||||
flow records, active-flow entries, or flow blueprints. This is true
|
||||
today (only `FlowConfig` writes to these config keys) and must remain
|
||||
true after separation. The config service provides the storage, but
|
||||
the flow service owns the semantics.
|
||||
|
||||
---
|
||||
|
||||
## Design Decisions
|
||||
|
||||
| Decision | Resolution | Rationale |
|
||||
|---|---|---|
|
||||
| Queue ownership | Every queue has exactly one explicit owner | Eliminates implicit creation, makes lifecycle auditable |
|
||||
| Queue deletion strategy | Retry persistently, don't block stop | A leaked queue is less harmful than a flow stuck in stopping state |
|
||||
| Purge without delete | Not needed | Flows are fully dynamic — stop and restart recreates everything |
|
||||
| Blueprint-level queues | Created on flow start (idempotent), never deleted | Shared across flow instances; creation is safe, deletion would break other flows |
|
||||
| Flow stop atomicity | Two-phase with `"stopping"` state | Allows crash recovery; flow service can resume incomplete shutdowns |
|
||||
| Backend protocol methods | All async | Backends hide blocking I/O internally; callers never deal with threading |
|
||||
| Pulsar lifecycle | REST admin API, not no-op | Persistent topics and subscriptions survive crashes; explicit cleanup needed |
|
||||
| Consumer queue creation | Consumers never create queues | Single ownership; `_connect()` connects to existing queues only |
|
||||
|
|
@ -11,7 +11,7 @@ def test_parameter_spec_is_a_spec_and_adds_parameter_value():
|
|||
flow = MagicMock(parameter={})
|
||||
processor = MagicMock()
|
||||
|
||||
spec.add(flow, processor, {"temperature": 0.7})
|
||||
spec.add(flow, processor, {"parameters": {"temperature": 0.7}})
|
||||
|
||||
assert isinstance(spec, Spec)
|
||||
assert "temperature" in flow.parameter
|
||||
|
|
|
|||
|
|
@ -1,58 +1,50 @@
|
|||
"""
|
||||
Unit tests for trustgraph.base.flow_processor
|
||||
Starting small with a single test to verify basic functionality
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from unittest import IsolatedAsyncioTestCase
|
||||
|
||||
# Import the service under test
|
||||
from trustgraph.base.flow_processor import FlowProcessor
|
||||
|
||||
|
||||
# Patches needed to let AsyncProcessor.__init__ run without real
|
||||
# infrastructure while still setting self.id correctly.
|
||||
ASYNC_PROCESSOR_PATCHES = [
|
||||
patch('trustgraph.base.async_processor.get_pubsub', return_value=MagicMock()),
|
||||
patch('trustgraph.base.async_processor.ProcessorMetrics', return_value=MagicMock()),
|
||||
patch('trustgraph.base.async_processor.Consumer', return_value=MagicMock()),
|
||||
]
|
||||
|
||||
|
||||
def with_async_processor_patches(func):
|
||||
"""Apply all AsyncProcessor dependency patches to a test."""
|
||||
for p in reversed(ASYNC_PROCESSOR_PATCHES):
|
||||
func = p(func)
|
||||
return func
|
||||
|
||||
|
||||
class TestFlowProcessorSimple(IsolatedAsyncioTestCase):
|
||||
"""Test FlowProcessor base class functionality"""
|
||||
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler')
|
||||
async def test_flow_processor_initialization_basic(self, mock_register_config, mock_async_init):
|
||||
@with_async_processor_patches
|
||||
async def test_flow_processor_initialization_basic(self, *mocks):
|
||||
"""Test basic FlowProcessor initialization"""
|
||||
# Arrange
|
||||
mock_async_init.return_value = None
|
||||
mock_register_config.return_value = None
|
||||
|
||||
config = {
|
||||
'id': 'test-flow-processor',
|
||||
'taskgroup': AsyncMock()
|
||||
}
|
||||
|
||||
# Act
|
||||
processor = FlowProcessor(**config)
|
||||
|
||||
# Assert
|
||||
# Verify AsyncProcessor.__init__ was called
|
||||
mock_async_init.assert_called_once()
|
||||
|
||||
# Verify register_config_handler was called with the correct handler
|
||||
mock_register_config.assert_called_once_with(
|
||||
processor.on_configure_flows, types=["active-flow"]
|
||||
)
|
||||
|
||||
# Verify FlowProcessor-specific initialization
|
||||
assert hasattr(processor, 'flows')
|
||||
assert processor.id == 'test-flow-processor'
|
||||
assert processor.flows == {}
|
||||
assert hasattr(processor, 'specifications')
|
||||
assert processor.specifications == []
|
||||
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler')
|
||||
async def test_register_specification(self, mock_register_config, mock_async_init):
|
||||
@with_async_processor_patches
|
||||
async def test_register_specification(self, *mocks):
|
||||
"""Test registering a specification"""
|
||||
# Arrange
|
||||
mock_async_init.return_value = None
|
||||
mock_register_config.return_value = None
|
||||
|
||||
config = {
|
||||
'id': 'test-flow-processor',
|
||||
'taskgroup': AsyncMock()
|
||||
|
|
@ -62,29 +54,23 @@ class TestFlowProcessorSimple(IsolatedAsyncioTestCase):
|
|||
mock_spec = MagicMock()
|
||||
mock_spec.name = 'test-spec'
|
||||
|
||||
# Act
|
||||
processor.register_specification(mock_spec)
|
||||
|
||||
# Assert
|
||||
assert len(processor.specifications) == 1
|
||||
assert processor.specifications[0] == mock_spec
|
||||
|
||||
@patch('trustgraph.base.flow_processor.Flow')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler')
|
||||
async def test_start_flow(self, mock_register_config, mock_async_init, mock_flow_class):
|
||||
@with_async_processor_patches
|
||||
async def test_start_flow(self, *mocks):
|
||||
"""Test starting a flow"""
|
||||
# Arrange
|
||||
mock_async_init.return_value = None
|
||||
mock_register_config.return_value = None
|
||||
mock_flow_class = mocks[-1]
|
||||
|
||||
config = {
|
||||
'id': 'test-flow-processor',
|
||||
'id': 'test-processor',
|
||||
'taskgroup': AsyncMock()
|
||||
}
|
||||
|
||||
processor = FlowProcessor(**config)
|
||||
processor.id = 'test-processor' # Set id for Flow creation
|
||||
|
||||
mock_flow = AsyncMock()
|
||||
mock_flow_class.return_value = mock_flow
|
||||
|
|
@ -92,58 +78,41 @@ class TestFlowProcessorSimple(IsolatedAsyncioTestCase):
|
|||
flow_name = 'test-flow'
|
||||
flow_defn = {'config': 'test-config'}
|
||||
|
||||
# Act
|
||||
await processor.start_flow(flow_name, flow_defn)
|
||||
|
||||
# Assert
|
||||
assert flow_name in processor.flows
|
||||
# Verify Flow was created with correct parameters
|
||||
mock_flow_class.assert_called_once_with('test-processor', flow_name, processor, flow_defn)
|
||||
# Verify the flow's start method was called
|
||||
mock_flow_class.assert_called_once_with(
|
||||
'test-processor', flow_name, processor, flow_defn
|
||||
)
|
||||
mock_flow.start.assert_called_once()
|
||||
|
||||
@patch('trustgraph.base.flow_processor.Flow')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler')
|
||||
async def test_stop_flow(self, mock_register_config, mock_async_init, mock_flow_class):
|
||||
@with_async_processor_patches
|
||||
async def test_stop_flow(self, *mocks):
|
||||
"""Test stopping a flow"""
|
||||
# Arrange
|
||||
mock_async_init.return_value = None
|
||||
mock_register_config.return_value = None
|
||||
mock_flow_class = mocks[-1]
|
||||
|
||||
config = {
|
||||
'id': 'test-flow-processor',
|
||||
'id': 'test-processor',
|
||||
'taskgroup': AsyncMock()
|
||||
}
|
||||
|
||||
processor = FlowProcessor(**config)
|
||||
processor.id = 'test-processor'
|
||||
|
||||
mock_flow = AsyncMock()
|
||||
mock_flow_class.return_value = mock_flow
|
||||
|
||||
flow_name = 'test-flow'
|
||||
flow_defn = {'config': 'test-config'}
|
||||
await processor.start_flow(flow_name, {'config': 'test-config'})
|
||||
|
||||
# Start a flow first
|
||||
await processor.start_flow(flow_name, flow_defn)
|
||||
|
||||
# Act
|
||||
await processor.stop_flow(flow_name)
|
||||
|
||||
# Assert
|
||||
assert flow_name not in processor.flows
|
||||
mock_flow.stop.assert_called_once()
|
||||
|
||||
@patch('trustgraph.base.flow_processor.Flow')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler')
|
||||
async def test_stop_flow_not_exists(self, mock_register_config, mock_async_init, mock_flow_class):
|
||||
@with_async_processor_patches
|
||||
async def test_stop_flow_not_exists(self, *mocks):
|
||||
"""Test stopping a flow that doesn't exist"""
|
||||
# Arrange
|
||||
mock_async_init.return_value = None
|
||||
mock_register_config.return_value = None
|
||||
|
||||
config = {
|
||||
'id': 'test-flow-processor',
|
||||
'taskgroup': AsyncMock()
|
||||
|
|
@ -151,167 +120,122 @@ class TestFlowProcessorSimple(IsolatedAsyncioTestCase):
|
|||
|
||||
processor = FlowProcessor(**config)
|
||||
|
||||
# Act - should not raise an exception
|
||||
await processor.stop_flow('non-existent-flow')
|
||||
|
||||
# Assert - flows dict should still be empty
|
||||
assert processor.flows == {}
|
||||
|
||||
@patch('trustgraph.base.flow_processor.Flow')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler')
|
||||
async def test_on_configure_flows_basic(self, mock_register_config, mock_async_init, mock_flow_class):
|
||||
@with_async_processor_patches
|
||||
async def test_on_configure_flows_basic(self, *mocks):
|
||||
"""Test basic flow configuration handling"""
|
||||
# Arrange
|
||||
mock_async_init.return_value = None
|
||||
mock_register_config.return_value = None
|
||||
mock_flow_class = mocks[-1]
|
||||
|
||||
config = {
|
||||
'id': 'test-flow-processor',
|
||||
'id': 'test-processor',
|
||||
'taskgroup': AsyncMock()
|
||||
}
|
||||
|
||||
processor = FlowProcessor(**config)
|
||||
processor.id = 'test-processor'
|
||||
|
||||
mock_flow = AsyncMock()
|
||||
mock_flow_class.return_value = mock_flow
|
||||
|
||||
# Configuration with flows for this processor
|
||||
flow_config = {
|
||||
'test-flow': {'config': 'test-config'}
|
||||
}
|
||||
config_data = {
|
||||
'active-flow': {
|
||||
'test-processor': '{"test-flow": {"config": "test-config"}}'
|
||||
'processor:test-processor': {
|
||||
'test-flow': '{"config": "test-config"}'
|
||||
}
|
||||
}
|
||||
|
||||
# Act
|
||||
await processor.on_configure_flows(config_data, version=1)
|
||||
|
||||
# Assert
|
||||
assert 'test-flow' in processor.flows
|
||||
mock_flow_class.assert_called_once_with('test-processor', 'test-flow', processor, {'config': 'test-config'})
|
||||
mock_flow_class.assert_called_once_with(
|
||||
'test-processor', 'test-flow', processor,
|
||||
{'config': 'test-config'}
|
||||
)
|
||||
mock_flow.start.assert_called_once()
|
||||
|
||||
@patch('trustgraph.base.flow_processor.Flow')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler')
|
||||
async def test_on_configure_flows_no_config(self, mock_register_config, mock_async_init, mock_flow_class):
|
||||
@with_async_processor_patches
|
||||
async def test_on_configure_flows_no_config(self, *mocks):
|
||||
"""Test flow configuration handling when no config exists for this processor"""
|
||||
# Arrange
|
||||
mock_async_init.return_value = None
|
||||
mock_register_config.return_value = None
|
||||
|
||||
config = {
|
||||
'id': 'test-flow-processor',
|
||||
'id': 'test-processor',
|
||||
'taskgroup': AsyncMock()
|
||||
}
|
||||
|
||||
processor = FlowProcessor(**config)
|
||||
processor.id = 'test-processor'
|
||||
|
||||
# Configuration without flows for this processor
|
||||
config_data = {
|
||||
'active-flow': {
|
||||
'other-processor': '{"other-flow": {"config": "other-config"}}'
|
||||
'processor:other-processor': {
|
||||
'other-flow': '{"config": "other-config"}'
|
||||
}
|
||||
}
|
||||
|
||||
# Act
|
||||
await processor.on_configure_flows(config_data, version=1)
|
||||
|
||||
# Assert
|
||||
assert processor.flows == {}
|
||||
mock_flow_class.assert_not_called()
|
||||
|
||||
@patch('trustgraph.base.flow_processor.Flow')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler')
|
||||
async def test_on_configure_flows_invalid_config(self, mock_register_config, mock_async_init, mock_flow_class):
|
||||
@with_async_processor_patches
|
||||
async def test_on_configure_flows_invalid_config(self, *mocks):
|
||||
"""Test flow configuration handling with invalid config format"""
|
||||
# Arrange
|
||||
mock_async_init.return_value = None
|
||||
mock_register_config.return_value = None
|
||||
|
||||
config = {
|
||||
'id': 'test-flow-processor',
|
||||
'id': 'test-processor',
|
||||
'taskgroup': AsyncMock()
|
||||
}
|
||||
|
||||
processor = FlowProcessor(**config)
|
||||
processor.id = 'test-processor'
|
||||
|
||||
# Configuration without active-flow key
|
||||
config_data = {
|
||||
'other-data': 'some-value'
|
||||
}
|
||||
|
||||
# Act
|
||||
await processor.on_configure_flows(config_data, version=1)
|
||||
|
||||
# Assert
|
||||
assert processor.flows == {}
|
||||
mock_flow_class.assert_not_called()
|
||||
|
||||
@patch('trustgraph.base.flow_processor.Flow')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler')
|
||||
async def test_on_configure_flows_start_and_stop(self, mock_register_config, mock_async_init, mock_flow_class):
|
||||
@with_async_processor_patches
|
||||
async def test_on_configure_flows_start_and_stop(self, *mocks):
|
||||
"""Test flow configuration handling with starting and stopping flows"""
|
||||
# Arrange
|
||||
mock_async_init.return_value = None
|
||||
mock_register_config.return_value = None
|
||||
mock_flow_class = mocks[-1]
|
||||
|
||||
config = {
|
||||
'id': 'test-flow-processor',
|
||||
'id': 'test-processor',
|
||||
'taskgroup': AsyncMock()
|
||||
}
|
||||
|
||||
processor = FlowProcessor(**config)
|
||||
processor.id = 'test-processor'
|
||||
|
||||
mock_flow1 = AsyncMock()
|
||||
mock_flow2 = AsyncMock()
|
||||
mock_flow_class.side_effect = [mock_flow1, mock_flow2]
|
||||
|
||||
# First configuration - start flow1
|
||||
config_data1 = {
|
||||
'active-flow': {
|
||||
'test-processor': '{"flow1": {"config": "config1"}}'
|
||||
'processor:test-processor': {
|
||||
'flow1': '{"config": "config1"}'
|
||||
}
|
||||
}
|
||||
|
||||
await processor.on_configure_flows(config_data1, version=1)
|
||||
|
||||
# Second configuration - stop flow1, start flow2
|
||||
config_data2 = {
|
||||
'active-flow': {
|
||||
'test-processor': '{"flow2": {"config": "config2"}}'
|
||||
'processor:test-processor': {
|
||||
'flow2': '{"config": "config2"}'
|
||||
}
|
||||
}
|
||||
|
||||
# Act
|
||||
await processor.on_configure_flows(config_data2, version=2)
|
||||
|
||||
# Assert
|
||||
# flow1 should be stopped and removed
|
||||
assert 'flow1' not in processor.flows
|
||||
mock_flow1.stop.assert_called_once()
|
||||
|
||||
# flow2 should be started and added
|
||||
assert 'flow2' in processor.flows
|
||||
mock_flow2.start.assert_called_once()
|
||||
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler')
|
||||
@with_async_processor_patches
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.start')
|
||||
async def test_start_calls_parent(self, mock_parent_start, mock_register_config, mock_async_init):
|
||||
async def test_start_calls_parent(self, mock_parent_start, *mocks):
|
||||
"""Test that start() calls parent start method"""
|
||||
# Arrange
|
||||
mock_async_init.return_value = None
|
||||
mock_register_config.return_value = None
|
||||
mock_parent_start.return_value = None
|
||||
|
||||
config = {
|
||||
|
|
@ -321,27 +245,17 @@ class TestFlowProcessorSimple(IsolatedAsyncioTestCase):
|
|||
|
||||
processor = FlowProcessor(**config)
|
||||
|
||||
# Act
|
||||
await processor.start()
|
||||
|
||||
# Assert
|
||||
mock_parent_start.assert_called_once()
|
||||
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
|
||||
@patch('trustgraph.base.async_processor.AsyncProcessor.register_config_handler')
|
||||
async def test_add_args_calls_parent(self, mock_register_config, mock_async_init):
|
||||
async def test_add_args_calls_parent(self):
|
||||
"""Test that add_args() calls parent add_args method"""
|
||||
# Arrange
|
||||
mock_async_init.return_value = None
|
||||
mock_register_config.return_value = None
|
||||
|
||||
mock_parser = MagicMock()
|
||||
|
||||
# Act
|
||||
with patch('trustgraph.base.async_processor.AsyncProcessor.add_args') as mock_parent_add_args:
|
||||
FlowProcessor.add_args(mock_parser)
|
||||
|
||||
# Assert
|
||||
mock_parent_add_args.assert_called_once_with(mock_parser)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -30,8 +30,8 @@ def mock_flow_config():
|
|||
mock_config.flows = {
|
||||
"test-flow": {
|
||||
"interfaces": {
|
||||
"triples-store": "test-triples-queue",
|
||||
"graph-embeddings-store": "test-ge-queue"
|
||||
"triples-store": {"flow": "test-triples-queue"},
|
||||
"graph-embeddings-store": {"flow": "test-ge-queue"}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -121,7 +121,7 @@ class TestConfigReceiver:
|
|||
fetch_calls.append(kwargs)
|
||||
config_receiver.fetch_and_apply = mock_fetch
|
||||
|
||||
for type_name in ["flow", "active-flow"]:
|
||||
for type_name in ["flow"]:
|
||||
fetch_calls.clear()
|
||||
config_receiver.config_version = 1
|
||||
|
||||
|
|
|
|||
|
|
@ -277,7 +277,7 @@ class TestDispatcherManager:
|
|||
# Setup test flow
|
||||
manager.flows["test_flow"] = {
|
||||
"interfaces": {
|
||||
"triples-store": {"queue": "test_queue"}
|
||||
"triples-store": {"flow": "test_queue"}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -298,7 +298,7 @@ class TestDispatcherManager:
|
|||
backend=mock_backend,
|
||||
ws="ws",
|
||||
running="running",
|
||||
queue={"queue": "test_queue"}
|
||||
queue="test_queue"
|
||||
)
|
||||
mock_dispatcher.start.assert_called_once()
|
||||
assert result == mock_dispatcher
|
||||
|
|
@ -328,7 +328,7 @@ class TestDispatcherManager:
|
|||
# Setup test flow
|
||||
manager.flows["test_flow"] = {
|
||||
"interfaces": {
|
||||
"triples-store": {"queue": "test_queue"}
|
||||
"triples-store": {"flow": "test_queue"}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -350,7 +350,7 @@ class TestDispatcherManager:
|
|||
# Setup test flow
|
||||
manager.flows["test_flow"] = {
|
||||
"interfaces": {
|
||||
"triples-store": {"queue": "test_queue"}
|
||||
"triples-store": {"flow": "test_queue"}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -370,7 +370,7 @@ class TestDispatcherManager:
|
|||
backend=mock_backend,
|
||||
ws="ws",
|
||||
running="running",
|
||||
queue={"queue": "test_queue"},
|
||||
queue="test_queue",
|
||||
consumer="api-gateway-test-uuid",
|
||||
subscriber="api-gateway-test-uuid"
|
||||
)
|
||||
|
|
@ -481,7 +481,7 @@ class TestDispatcherManager:
|
|||
# Setup test flow
|
||||
manager.flows["test_flow"] = {
|
||||
"interfaces": {
|
||||
"text-load": {"queue": "text_load_queue"}
|
||||
"text-load": {"flow": "text_load_queue"}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -502,7 +502,7 @@ class TestDispatcherManager:
|
|||
# Verify dispatcher was created with correct parameters
|
||||
mock_dispatcher_class.assert_called_once_with(
|
||||
backend=mock_backend,
|
||||
queue={"queue": "text_load_queue"}
|
||||
queue="text_load_queue"
|
||||
)
|
||||
mock_dispatcher.start.assert_called_once()
|
||||
mock_dispatcher.process.assert_called_once_with("data", "responder")
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ from . consumer import Consumer
|
|||
from . producer import Producer
|
||||
from . publisher import Publisher
|
||||
from . subscriber import Subscriber
|
||||
from . metrics import ProcessorMetrics, ConsumerMetrics, ProducerMetrics
|
||||
from . metrics import ProcessorMetrics, ConsumerMetrics, ProducerMetrics, SubscriberMetrics
|
||||
from . logging import add_logging_args, setup_logging
|
||||
from . flow_processor import FlowProcessor
|
||||
from . consumer_spec import ConsumerSpec
|
||||
|
|
@ -22,6 +22,7 @@ from . text_completion_client import (
|
|||
TextCompletionClientSpec, TextCompletionClient, TextCompletionResult,
|
||||
)
|
||||
from . prompt_client import PromptClientSpec, PromptClient, PromptResult
|
||||
from . config_client import ConfigClientSpec, ConfigClient
|
||||
from . triples_store_service import TriplesStoreService
|
||||
from . graph_embeddings_store_service import GraphEmbeddingsStoreService
|
||||
from . document_embeddings_store_service import DocumentEmbeddingsStoreService
|
||||
|
|
|
|||
|
|
@ -159,6 +159,62 @@ class PubSubBackend(Protocol):
|
|||
"""
|
||||
...
|
||||
|
||||
async def create_queue(self, topic: str, subscription: str) -> None:
|
||||
"""
|
||||
Pre-create a queue so it exists before any consumer connects.
|
||||
|
||||
The topic and subscription together identify the queue, mirroring
|
||||
create_consumer where the queue name is derived from both.
|
||||
|
||||
Idempotent — creating an already-existing queue succeeds silently.
|
||||
|
||||
Args:
|
||||
topic: Queue identifier in class:topicspace:topic format
|
||||
subscription: Subscription/consumer group name
|
||||
"""
|
||||
...
|
||||
|
||||
async def delete_queue(self, topic: str, subscription: str) -> None:
|
||||
"""
|
||||
Delete a queue and any messages it contains.
|
||||
|
||||
The topic and subscription together identify the queue, mirroring
|
||||
create_consumer where the queue name is derived from both.
|
||||
|
||||
Idempotent — deleting a non-existent queue succeeds silently.
|
||||
|
||||
Args:
|
||||
topic: Queue identifier in class:topicspace:topic format
|
||||
subscription: Subscription/consumer group name
|
||||
"""
|
||||
...
|
||||
|
||||
async def queue_exists(self, topic: str, subscription: str) -> bool:
|
||||
"""
|
||||
Check whether a queue exists.
|
||||
|
||||
Args:
|
||||
topic: Queue identifier in class:topicspace:topic format
|
||||
subscription: Subscription/consumer group name
|
||||
|
||||
Returns:
|
||||
True if the queue exists, False otherwise.
|
||||
"""
|
||||
...
|
||||
|
||||
async def ensure_queue(self, topic: str, subscription: str) -> None:
|
||||
"""
|
||||
Ensure a queue exists, creating it if necessary.
|
||||
|
||||
Convenience wrapper — checks existence, creates if missing.
|
||||
Used by system services on startup.
|
||||
|
||||
Args:
|
||||
topic: Queue identifier in class:topicspace:topic format
|
||||
subscription: Subscription/consumer group name
|
||||
"""
|
||||
...
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the backend connection."""
|
||||
...
|
||||
|
|
|
|||
92
trustgraph-base/trustgraph/base/config_client.py
Normal file
92
trustgraph-base/trustgraph/base/config_client.py
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
|
||||
from . request_response_spec import RequestResponse, RequestResponseSpec
|
||||
from .. schema import ConfigRequest, ConfigResponse, ConfigKey, ConfigValue
|
||||
|
||||
CONFIG_TIMEOUT = 10
|
||||
|
||||
|
||||
class ConfigClient(RequestResponse):
|
||||
|
||||
async def _request(self, timeout=CONFIG_TIMEOUT, **kwargs):
|
||||
resp = await self.request(
|
||||
ConfigRequest(**kwargs),
|
||||
timeout=timeout,
|
||||
)
|
||||
if resp.error:
|
||||
raise RuntimeError(
|
||||
f"{resp.error.type}: {resp.error.message}"
|
||||
)
|
||||
return resp
|
||||
|
||||
async def get(self, type, key, timeout=CONFIG_TIMEOUT):
|
||||
"""Get a single config value. Returns the value string or None."""
|
||||
resp = await self._request(
|
||||
operation="get",
|
||||
keys=[ConfigKey(type=type, key=key)],
|
||||
timeout=timeout,
|
||||
)
|
||||
if resp.values and len(resp.values) > 0:
|
||||
return resp.values[0].value
|
||||
return None
|
||||
|
||||
async def put(self, type, key, value, timeout=CONFIG_TIMEOUT):
|
||||
"""Put a single config value."""
|
||||
await self._request(
|
||||
operation="put",
|
||||
values=[ConfigValue(type=type, key=key, value=value)],
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
async def put_many(self, values, timeout=CONFIG_TIMEOUT):
|
||||
"""Put multiple config values in a single request.
|
||||
values is a list of (type, key, value) tuples."""
|
||||
await self._request(
|
||||
operation="put",
|
||||
values=[
|
||||
ConfigValue(type=t, key=k, value=v)
|
||||
for t, k, v in values
|
||||
],
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
async def delete(self, type, key, timeout=CONFIG_TIMEOUT):
|
||||
"""Delete a single config key."""
|
||||
await self._request(
|
||||
operation="delete",
|
||||
keys=[ConfigKey(type=type, key=key)],
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
async def delete_many(self, keys, timeout=CONFIG_TIMEOUT):
|
||||
"""Delete multiple config keys in a single request.
|
||||
keys is a list of (type, key) tuples."""
|
||||
await self._request(
|
||||
operation="delete",
|
||||
keys=[
|
||||
ConfigKey(type=t, key=k)
|
||||
for t, k in keys
|
||||
],
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
async def keys(self, type, timeout=CONFIG_TIMEOUT):
|
||||
"""List all keys for a config type."""
|
||||
resp = await self._request(
|
||||
operation="list",
|
||||
type=type,
|
||||
timeout=timeout,
|
||||
)
|
||||
return resp.directory
|
||||
|
||||
|
||||
class ConfigClientSpec(RequestResponseSpec):
|
||||
def __init__(
|
||||
self, request_name, response_name,
|
||||
):
|
||||
super(ConfigClientSpec, self).__init__(
|
||||
request_name=request_name,
|
||||
request_schema=ConfigRequest,
|
||||
response_name=response_name,
|
||||
response_schema=ConfigResponse,
|
||||
impl=ConfigClient,
|
||||
)
|
||||
|
|
@ -23,7 +23,7 @@ class ConsumerSpec(Spec):
|
|||
taskgroup = processor.taskgroup,
|
||||
flow = flow,
|
||||
backend = processor.pubsub,
|
||||
topic = definition[self.name],
|
||||
topic = definition["topics"][self.name],
|
||||
subscriber = processor.id + "--" + flow.name + "--" + self.name,
|
||||
schema = self.schema,
|
||||
handler = self.handler,
|
||||
|
|
|
|||
|
|
@ -29,9 +29,9 @@ class FlowProcessor(AsyncProcessor):
|
|||
# Initialise base class
|
||||
super(FlowProcessor, self).__init__(**params)
|
||||
|
||||
# Register configuration handler
|
||||
# Register configuration handler for this processor's config type
|
||||
self.register_config_handler(
|
||||
self.on_configure_flows, types=["active-flow"]
|
||||
self.on_configure_flows, types=[f"processor:{self.id}"]
|
||||
)
|
||||
|
||||
# Initialise flow information state
|
||||
|
|
@ -66,17 +66,16 @@ class FlowProcessor(AsyncProcessor):
|
|||
|
||||
logger.info(f"Got config version {version}")
|
||||
|
||||
# Skip over invalid data
|
||||
if "active-flow" not in config: return
|
||||
|
||||
# Check there's configuration information for me
|
||||
if self.id in config["active-flow"]:
|
||||
|
||||
# Get my flow config
|
||||
flow_config = json.loads(config["active-flow"][self.id])
|
||||
config_type = f"processor:{self.id}"
|
||||
|
||||
# Get my flow config — each key is a variant, each value is
|
||||
# the JSON config for that flow variant
|
||||
if config_type in config:
|
||||
flow_config = {
|
||||
k: json.loads(v)
|
||||
for k, v in config[config_type].items()
|
||||
}
|
||||
else:
|
||||
|
||||
logger.debug("No configuration settings for me.")
|
||||
flow_config = {}
|
||||
|
||||
|
|
|
|||
|
|
@ -18,6 +18,6 @@ class ParameterSpec(Spec):
|
|||
|
||||
def add(self, flow: Any, processor: Any, definition: dict[str, Any]) -> None:
|
||||
|
||||
value = definition.get(self.name, None)
|
||||
value = definition.get("parameters", {}).get(self.name, None)
|
||||
|
||||
flow.parameter[self.name] = Parameter(value)
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ class ProducerSpec(Spec):
|
|||
|
||||
producer = Producer(
|
||||
backend = processor.pubsub,
|
||||
topic = definition[self.name],
|
||||
topic = definition["topics"][self.name],
|
||||
schema = self.schema,
|
||||
metrics = producer_metrics,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -266,6 +266,26 @@ class PulsarBackend:
|
|||
|
||||
return PulsarBackendConsumer(pulsar_consumer, schema)
|
||||
|
||||
async def create_queue(self, topic: str, subscription: str) -> None:
|
||||
"""No-op — Pulsar auto-creates topics on first use.
|
||||
TODO: Use admin REST API for explicit persistent topic creation."""
|
||||
pass
|
||||
|
||||
async def delete_queue(self, topic: str, subscription: str) -> None:
|
||||
"""No-op — to be replaced with admin REST API calls.
|
||||
TODO: Delete subscription and persistent topic via admin API."""
|
||||
pass
|
||||
|
||||
async def queue_exists(self, topic: str, subscription: str) -> bool:
|
||||
"""Returns True — Pulsar auto-creates on subscribe.
|
||||
TODO: Use admin REST API for actual existence check."""
|
||||
return True
|
||||
|
||||
async def ensure_queue(self, topic: str, subscription: str) -> None:
|
||||
"""No-op — Pulsar auto-creates topics on first use.
|
||||
TODO: Use admin REST API for explicit creation."""
|
||||
pass
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the Pulsar client."""
|
||||
self.client.close()
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ Uses basic_consume (push) instead of basic_get (polling) for
|
|||
efficient message delivery.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
import logging
|
||||
|
|
@ -170,21 +171,23 @@ class RabbitMQBackendConsumer:
|
|||
self._connection = pika.BlockingConnection(self._connection_params)
|
||||
self._channel = self._connection.channel()
|
||||
|
||||
# Declare the topic exchange
|
||||
# Declare the topic exchange (idempotent, also done by producers)
|
||||
self._channel.exchange_declare(
|
||||
exchange=self._exchange_name,
|
||||
exchange_type='topic',
|
||||
durable=True,
|
||||
)
|
||||
|
||||
# Declare the queue — anonymous if exclusive
|
||||
if self._exclusive:
|
||||
# Anonymous ephemeral queue (response/notify class).
|
||||
# These are per-consumer and must be created here — the
|
||||
# broker assigns the name.
|
||||
result = self._channel.queue_declare(
|
||||
queue=self._queue_name,
|
||||
durable=self._durable,
|
||||
exclusive=self._exclusive,
|
||||
auto_delete=self._auto_delete,
|
||||
queue='',
|
||||
durable=False,
|
||||
exclusive=True,
|
||||
auto_delete=True,
|
||||
)
|
||||
# Capture actual name (important for anonymous queues where name='')
|
||||
self._queue_name = result.method.queue
|
||||
|
||||
self._channel.queue_bind(
|
||||
|
|
@ -192,6 +195,13 @@ class RabbitMQBackendConsumer:
|
|||
exchange=self._exchange_name,
|
||||
routing_key=self._routing_key,
|
||||
)
|
||||
else:
|
||||
# Named queue (flow/request class). Queue must already
|
||||
# exist — created by the flow service or ensure_queue.
|
||||
# We just verify it exists and bind to consume.
|
||||
self._channel.queue_declare(
|
||||
queue=self._queue_name, passive=True,
|
||||
)
|
||||
|
||||
self._channel.basic_qos(prefetch_count=1)
|
||||
|
||||
|
|
@ -409,5 +419,124 @@ class RabbitMQBackend:
|
|||
queue_name, schema, queue_durable, exclusive, auto_delete,
|
||||
)
|
||||
|
||||
def _create_queue_sync(self, exchange, routing_key, queue_name, durable):
|
||||
"""Blocking queue creation — run via asyncio.to_thread."""
|
||||
connection = None
|
||||
try:
|
||||
connection = pika.BlockingConnection(self._connection_params)
|
||||
channel = connection.channel()
|
||||
channel.exchange_declare(
|
||||
exchange=exchange,
|
||||
exchange_type='topic',
|
||||
durable=True,
|
||||
)
|
||||
channel.queue_declare(
|
||||
queue=queue_name,
|
||||
durable=durable,
|
||||
exclusive=False,
|
||||
auto_delete=False,
|
||||
)
|
||||
channel.queue_bind(
|
||||
queue=queue_name,
|
||||
exchange=exchange,
|
||||
routing_key=routing_key,
|
||||
)
|
||||
logger.info(f"Created queue: {queue_name}")
|
||||
finally:
|
||||
if connection and connection.is_open:
|
||||
try:
|
||||
connection.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def create_queue(self, topic: str, subscription: str) -> None:
|
||||
"""Pre-create a named queue bound to the topic exchange.
|
||||
|
||||
Only applies to shared queues (flow/request class). Response and
|
||||
notify queues are anonymous/auto-delete and created by consumers.
|
||||
"""
|
||||
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
|
||||
|
||||
if cls in ('response', 'notify'):
|
||||
return
|
||||
|
||||
queue_name = f"{exchange}.{routing_key}.{subscription}"
|
||||
await asyncio.to_thread(
|
||||
self._create_queue_sync, exchange, routing_key,
|
||||
queue_name, durable,
|
||||
)
|
||||
|
||||
def _delete_queue_sync(self, queue_name):
|
||||
"""Blocking queue deletion — run via asyncio.to_thread."""
|
||||
connection = None
|
||||
try:
|
||||
connection = pika.BlockingConnection(self._connection_params)
|
||||
channel = connection.channel()
|
||||
channel.queue_delete(queue=queue_name)
|
||||
logger.info(f"Deleted queue: {queue_name}")
|
||||
except Exception as e:
|
||||
# Idempotent — queue may already be gone
|
||||
logger.debug(f"Queue delete for {queue_name}: {e}")
|
||||
finally:
|
||||
if connection and connection.is_open:
|
||||
try:
|
||||
connection.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def delete_queue(self, topic: str, subscription: str) -> None:
|
||||
"""Delete a named queue and any messages it contains.
|
||||
|
||||
Only applies to shared queues (flow/request class). Response and
|
||||
notify queues are anonymous/auto-delete and managed by the broker.
|
||||
"""
|
||||
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
|
||||
|
||||
if cls in ('response', 'notify'):
|
||||
return
|
||||
|
||||
queue_name = f"{exchange}.{routing_key}.{subscription}"
|
||||
await asyncio.to_thread(self._delete_queue_sync, queue_name)
|
||||
|
||||
def _queue_exists_sync(self, queue_name):
|
||||
"""Blocking queue existence check — run via asyncio.to_thread.
|
||||
Uses passive=True which checks without creating."""
|
||||
connection = None
|
||||
try:
|
||||
connection = pika.BlockingConnection(self._connection_params)
|
||||
channel = connection.channel()
|
||||
channel.queue_declare(queue=queue_name, passive=True)
|
||||
return True
|
||||
except pika.exceptions.ChannelClosedByBroker:
|
||||
# 404 NOT_FOUND — queue does not exist
|
||||
return False
|
||||
finally:
|
||||
if connection and connection.is_open:
|
||||
try:
|
||||
connection.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def queue_exists(self, topic: str, subscription: str) -> bool:
|
||||
"""Check whether a named queue exists.
|
||||
|
||||
Only applies to shared queues (flow/request class). Response and
|
||||
notify queues are anonymous/ephemeral — always returns False.
|
||||
"""
|
||||
exchange, routing_key, cls, durable = self._parse_queue_id(topic)
|
||||
|
||||
if cls in ('response', 'notify'):
|
||||
return False
|
||||
|
||||
queue_name = f"{exchange}.{routing_key}.{subscription}"
|
||||
return await asyncio.to_thread(
|
||||
self._queue_exists_sync, queue_name
|
||||
)
|
||||
|
||||
async def ensure_queue(self, topic: str, subscription: str) -> None:
|
||||
"""Ensure a queue exists, creating it if necessary."""
|
||||
if not await self.queue_exists(topic, subscription):
|
||||
await self.create_queue(topic, subscription)
|
||||
|
||||
def close(self) -> None:
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -137,10 +137,10 @@ class RequestResponseSpec(Spec):
|
|||
"--" + str(uuid.uuid4())
|
||||
),
|
||||
consumer_name = flow.id,
|
||||
request_topic = definition[self.request_name],
|
||||
request_topic = definition["topics"][self.request_name],
|
||||
request_schema = self.request_schema,
|
||||
request_metrics = request_metrics,
|
||||
response_topic = definition[self.response_name],
|
||||
response_topic = definition["topics"][self.response_name],
|
||||
response_schema = self.response_schema,
|
||||
response_metrics = response_metrics,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ class SubscriberSpec(Spec):
|
|||
|
||||
subscriber = Subscriber(
|
||||
backend = processor.pubsub,
|
||||
topic = definition[self.name],
|
||||
topic = definition["topics"][self.name],
|
||||
subscription = flow.id,
|
||||
consumer_name = flow.id,
|
||||
schema = self.schema,
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ def describe_interfaces(intdefs, flow):
|
|||
lst.append(f"{k} response: {resp}")
|
||||
|
||||
if kind == "send":
|
||||
q = intfs[k]
|
||||
q = intfs[k]["flow"]
|
||||
|
||||
lst.append(f"{k}: {q}")
|
||||
|
||||
|
|
|
|||
|
|
@ -61,6 +61,7 @@ api-gateway = "trustgraph.gateway:run"
|
|||
chunker-recursive = "trustgraph.chunking.recursive:run"
|
||||
chunker-token = "trustgraph.chunking.token:run"
|
||||
config-svc = "trustgraph.config.service:run"
|
||||
flow-svc = "trustgraph.flow.service:run"
|
||||
doc-embeddings-query-milvus = "trustgraph.query.doc_embeddings.milvus:run"
|
||||
doc-embeddings-query-pinecone = "trustgraph.query.doc_embeddings.pinecone:run"
|
||||
doc-embeddings-query-qdrant = "trustgraph.query.doc_embeddings.qdrant:run"
|
||||
|
|
|
|||
|
|
@ -11,14 +11,10 @@ from trustgraph.schema import ConfigRequest, ConfigResponse, ConfigPush
|
|||
from trustgraph.schema import config_request_queue, config_response_queue
|
||||
from trustgraph.schema import config_push_queue
|
||||
|
||||
from trustgraph.schema import FlowRequest, FlowResponse
|
||||
from trustgraph.schema import flow_request_queue, flow_response_queue
|
||||
|
||||
from trustgraph.base import AsyncProcessor, Consumer, Producer
|
||||
from trustgraph.base.cassandra_config import add_cassandra_args, resolve_cassandra_config
|
||||
|
||||
from . config import Configuration
|
||||
from . flow import FlowConfig
|
||||
|
||||
from ... base import ProcessorMetrics, ConsumerMetrics, ProducerMetrics
|
||||
from ... base import Consumer, Producer
|
||||
|
|
@ -32,9 +28,6 @@ default_config_request_queue = config_request_queue
|
|||
default_config_response_queue = config_response_queue
|
||||
default_config_push_queue = config_push_queue
|
||||
|
||||
default_flow_request_queue = flow_request_queue
|
||||
default_flow_response_queue = flow_response_queue
|
||||
|
||||
default_cassandra_host = "cassandra"
|
||||
|
||||
class Processor(AsyncProcessor):
|
||||
|
|
@ -51,13 +44,6 @@ class Processor(AsyncProcessor):
|
|||
"config_push_queue", default_config_push_queue
|
||||
)
|
||||
|
||||
flow_request_queue = params.get(
|
||||
"flow_request_queue", default_flow_request_queue
|
||||
)
|
||||
flow_response_queue = params.get(
|
||||
"flow_response_queue", default_flow_response_queue
|
||||
)
|
||||
|
||||
cassandra_host = params.get("cassandra_host")
|
||||
cassandra_username = params.get("cassandra_username")
|
||||
cassandra_password = params.get("cassandra_password")
|
||||
|
|
@ -77,16 +63,11 @@ class Processor(AsyncProcessor):
|
|||
|
||||
id = params.get("id")
|
||||
|
||||
flow_request_schema = FlowRequest
|
||||
flow_response_schema = FlowResponse
|
||||
|
||||
super(Processor, self).__init__(
|
||||
**params | {
|
||||
"config_request_schema": ConfigRequest.__name__,
|
||||
"config_response_schema": ConfigResponse.__name__,
|
||||
"config_push_schema": ConfigPush.__name__,
|
||||
"flow_request_schema": FlowRequest.__name__,
|
||||
"flow_response_schema": FlowResponse.__name__,
|
||||
"cassandra_host": self.cassandra_host,
|
||||
"cassandra_username": self.cassandra_username,
|
||||
"cassandra_password": self.cassandra_password,
|
||||
|
|
@ -103,12 +84,8 @@ class Processor(AsyncProcessor):
|
|||
processor = self.id, flow = None, name = "config-push"
|
||||
)
|
||||
|
||||
flow_request_metrics = ConsumerMetrics(
|
||||
processor = self.id, flow = None, name = "flow-request"
|
||||
)
|
||||
flow_response_metrics = ProducerMetrics(
|
||||
processor = self.id, flow = None, name = "flow-response"
|
||||
)
|
||||
self.config_request_topic = config_request_queue
|
||||
self.config_request_subscriber = id
|
||||
|
||||
self.config_request_consumer = Consumer(
|
||||
taskgroup = self.taskgroup,
|
||||
|
|
@ -135,24 +112,6 @@ class Processor(AsyncProcessor):
|
|||
metrics = config_push_metrics,
|
||||
)
|
||||
|
||||
self.flow_request_consumer = Consumer(
|
||||
taskgroup = self.taskgroup,
|
||||
backend = self.pubsub,
|
||||
flow = None,
|
||||
topic = flow_request_queue,
|
||||
subscriber = id,
|
||||
schema = FlowRequest,
|
||||
handler = self.on_flow_request,
|
||||
metrics = flow_request_metrics,
|
||||
)
|
||||
|
||||
self.flow_response_producer = Producer(
|
||||
backend = self.pubsub,
|
||||
topic = flow_response_queue,
|
||||
schema = FlowResponse,
|
||||
metrics = flow_response_metrics,
|
||||
)
|
||||
|
||||
self.config = Configuration(
|
||||
host = self.cassandra_host,
|
||||
username = self.cassandra_username,
|
||||
|
|
@ -161,15 +120,15 @@ class Processor(AsyncProcessor):
|
|||
push = self.push
|
||||
)
|
||||
|
||||
self.flow = FlowConfig(self.config)
|
||||
|
||||
logger.info("Config service initialized")
|
||||
|
||||
async def start(self):
|
||||
|
||||
await self.pubsub.ensure_queue(
|
||||
self.config_request_topic, self.config_request_subscriber
|
||||
)
|
||||
await self.push() # Startup poke: empty types = everything
|
||||
await self.config_request_consumer.start()
|
||||
await self.flow_request_consumer.start()
|
||||
|
||||
async def push(self, types=None):
|
||||
|
||||
|
|
@ -193,7 +152,7 @@ class Processor(AsyncProcessor):
|
|||
# Sender-produced ID
|
||||
id = msg.properties()["id"]
|
||||
|
||||
logger.info(f"Handling config request {id}...")
|
||||
logger.debug(f"Handling config request {id}...")
|
||||
|
||||
resp = await self.config.handle(v)
|
||||
|
||||
|
|
@ -214,36 +173,6 @@ class Processor(AsyncProcessor):
|
|||
resp, properties={"id": id}
|
||||
)
|
||||
|
||||
async def on_flow_request(self, msg, consumer, flow):
|
||||
|
||||
try:
|
||||
|
||||
v = msg.value()
|
||||
|
||||
# Sender-produced ID
|
||||
id = msg.properties()["id"]
|
||||
|
||||
logger.info(f"Handling flow request {id}...")
|
||||
|
||||
resp = await self.flow.handle(v)
|
||||
|
||||
await self.flow_response_producer.send(
|
||||
resp, properties={"id": id}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
resp = FlowResponse(
|
||||
error=Error(
|
||||
type = "flow-error",
|
||||
message = str(e),
|
||||
),
|
||||
)
|
||||
|
||||
await self.flow_response_producer.send(
|
||||
resp, properties={"id": id}
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
|
||||
|
|
@ -263,18 +192,6 @@ class Processor(AsyncProcessor):
|
|||
|
||||
# Note: --config-push-queue is already added by AsyncProcessor.add_args()
|
||||
|
||||
parser.add_argument(
|
||||
'--flow-request-queue',
|
||||
default=default_flow_request_queue,
|
||||
help=f'Flow request queue (default: {default_flow_request_queue})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--flow-response-queue',
|
||||
default=default_flow_response_queue,
|
||||
help=f'Flow response queue {default_flow_response_queue}',
|
||||
)
|
||||
|
||||
add_cassandra_args(parser)
|
||||
|
||||
def run():
|
||||
|
|
|
|||
|
|
@ -192,8 +192,8 @@ class KnowledgeManager:
|
|||
if "graph-embeddings-store" not in flow["interfaces"]:
|
||||
raise RuntimeError("Flow has no graph-embeddings-store")
|
||||
|
||||
t_q = flow["interfaces"]["triples-store"]
|
||||
ge_q = flow["interfaces"]["graph-embeddings-store"]
|
||||
t_q = flow["interfaces"]["triples-store"]["flow"]
|
||||
ge_q = flow["interfaces"]["graph-embeddings-store"]["flow"]
|
||||
|
||||
# Got this far, it should all work
|
||||
await respond(
|
||||
|
|
|
|||
|
|
@ -82,6 +82,9 @@ class Processor(AsyncProcessor):
|
|||
processor = self.id, flow = None, name = "knowledge-response"
|
||||
)
|
||||
|
||||
self.knowledge_request_topic = knowledge_request_queue
|
||||
self.knowledge_request_subscriber = id
|
||||
|
||||
self.knowledge_request_consumer = Consumer(
|
||||
taskgroup = self.taskgroup,
|
||||
backend = self.pubsub,
|
||||
|
|
@ -116,6 +119,9 @@ class Processor(AsyncProcessor):
|
|||
|
||||
async def start(self):
|
||||
|
||||
await self.pubsub.ensure_queue(
|
||||
self.knowledge_request_topic, self.knowledge_request_subscriber
|
||||
)
|
||||
await super(Processor, self).start()
|
||||
await self.knowledge_request_consumer.start()
|
||||
await self.knowledge_response_producer.start()
|
||||
|
|
|
|||
2
trustgraph-flow/trustgraph/flow/__init__.py
Normal file
2
trustgraph-flow/trustgraph/flow/__init__.py
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
|
||||
from . service import *
|
||||
2
trustgraph-flow/trustgraph/flow/service/__init__.py
Normal file
2
trustgraph-flow/trustgraph/flow/service/__init__.py
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
|
||||
from . service import *
|
||||
6
trustgraph-flow/trustgraph/flow/service/__main__.py
Normal file
6
trustgraph-flow/trustgraph/flow/service/__main__.py
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from . service import run
|
||||
|
||||
if __name__ == '__main__':
|
||||
run()
|
||||
|
|
@ -1,15 +1,22 @@
|
|||
|
||||
from trustgraph.schema import FlowResponse, Error
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
|
||||
# Module logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Queue deletion retry settings
|
||||
DELETE_RETRIES = 5
|
||||
DELETE_RETRY_DELAY = 2 # seconds
|
||||
|
||||
|
||||
class FlowConfig:
|
||||
def __init__(self, config):
|
||||
def __init__(self, config, pubsub):
|
||||
|
||||
self.config = config
|
||||
self.pubsub = pubsub
|
||||
# Cache for parameter type definitions to avoid repeated lookups
|
||||
self.param_type_cache = {}
|
||||
|
||||
|
|
@ -22,9 +29,12 @@ class FlowConfig:
|
|||
user_params: User-provided parameters dict (may be None or empty)
|
||||
|
||||
Returns:
|
||||
Complete parameter dict with user values and defaults merged (all values as strings)
|
||||
Complete parameter dict with user values and defaults merged
|
||||
(all values as strings)
|
||||
"""
|
||||
|
||||
# If the flow blueprint has no parameters section, return user params as-is (stringified)
|
||||
|
||||
if "parameters" not in flow_blueprint:
|
||||
if not user_params:
|
||||
return {}
|
||||
|
|
@ -49,7 +59,9 @@ class FlowConfig:
|
|||
if param_type not in self.param_type_cache:
|
||||
try:
|
||||
# Fetch parameter type definition from config store
|
||||
type_def = await self.config.get("parameter-type").get(param_type)
|
||||
type_def = await self.config.get(
|
||||
"parameter-type", param_type
|
||||
)
|
||||
if type_def:
|
||||
self.param_type_cache[param_type] = json.loads(type_def)
|
||||
else:
|
||||
|
|
@ -102,7 +114,7 @@ class FlowConfig:
|
|||
|
||||
async def handle_list_blueprints(self, msg):
|
||||
|
||||
names = list(await self.config.get("flow-blueprint").keys())
|
||||
names = list(await self.config.keys("flow-blueprint"))
|
||||
|
||||
return FlowResponse(
|
||||
error = None,
|
||||
|
|
@ -114,20 +126,17 @@ class FlowConfig:
|
|||
return FlowResponse(
|
||||
error = None,
|
||||
blueprint_definition = await self.config.get(
|
||||
"flow-blueprint"
|
||||
).get(msg.blueprint_name),
|
||||
"flow-blueprint", msg.blueprint_name
|
||||
),
|
||||
)
|
||||
|
||||
async def handle_put_blueprint(self, msg):
|
||||
|
||||
await self.config.get("flow-blueprint").put(
|
||||
await self.config.put(
|
||||
"flow-blueprint",
|
||||
msg.blueprint_name, msg.blueprint_definition
|
||||
)
|
||||
|
||||
await self.config.inc_version()
|
||||
|
||||
await self.config.push(types=["flow-blueprint"])
|
||||
|
||||
return FlowResponse(
|
||||
error = None,
|
||||
)
|
||||
|
|
@ -136,11 +145,7 @@ class FlowConfig:
|
|||
|
||||
logger.debug(f"Flow config message: {msg}")
|
||||
|
||||
await self.config.get("flow-blueprint").delete(msg.blueprint_name)
|
||||
|
||||
await self.config.inc_version()
|
||||
|
||||
await self.config.push(types=["flow-blueprint"])
|
||||
await self.config.delete("flow-blueprint", msg.blueprint_name)
|
||||
|
||||
return FlowResponse(
|
||||
error = None,
|
||||
|
|
@ -148,7 +153,7 @@ class FlowConfig:
|
|||
|
||||
async def handle_list_flows(self, msg):
|
||||
|
||||
names = list(await self.config.get("flow").keys())
|
||||
names = list(await self.config.keys("flow"))
|
||||
|
||||
return FlowResponse(
|
||||
error = None,
|
||||
|
|
@ -157,7 +162,7 @@ class FlowConfig:
|
|||
|
||||
async def handle_get_flow(self, msg):
|
||||
|
||||
flow_data = await self.config.get("flow").get(msg.flow_id)
|
||||
flow_data = await self.config.get("flow", msg.flow_id)
|
||||
flow = json.loads(flow_data)
|
||||
|
||||
return FlowResponse(
|
||||
|
|
@ -175,17 +180,17 @@ class FlowConfig:
|
|||
if msg.flow_id is None:
|
||||
raise RuntimeError("No flow ID")
|
||||
|
||||
if msg.flow_id in await self.config.get("flow").keys():
|
||||
if msg.flow_id in await self.config.keys("flow"):
|
||||
raise RuntimeError("Flow already exists")
|
||||
|
||||
if msg.description is None:
|
||||
raise RuntimeError("No description")
|
||||
|
||||
if msg.blueprint_name not in await self.config.get("flow-blueprint").keys():
|
||||
if msg.blueprint_name not in await self.config.keys("flow-blueprint"):
|
||||
raise RuntimeError("Blueprint does not exist")
|
||||
|
||||
cls = json.loads(
|
||||
await self.config.get("flow-blueprint").get(msg.blueprint_name)
|
||||
await self.config.get("flow-blueprint", msg.blueprint_name)
|
||||
)
|
||||
|
||||
# Resolve parameters by merging user-provided values with defaults
|
||||
|
|
@ -210,6 +215,15 @@ class FlowConfig:
|
|||
|
||||
return result
|
||||
|
||||
# Pre-create flow-level queues so the data path is wired
|
||||
# before processors receive their config and start connecting.
|
||||
queues = self._collect_flow_queues(cls, repl_template_with_params)
|
||||
for topic, subscription in queues:
|
||||
await self.pubsub.create_queue(topic, subscription)
|
||||
|
||||
# Build all processor config updates, then write in a single batch.
|
||||
updates = []
|
||||
|
||||
for kind in ("blueprint", "flow"):
|
||||
|
||||
for k, v in cls[kind].items():
|
||||
|
|
@ -218,33 +232,30 @@ class FlowConfig:
|
|||
|
||||
variant = repl_template_with_params(variant)
|
||||
|
||||
v = {
|
||||
topics = {
|
||||
repl_template_with_params(k2): repl_template_with_params(v2)
|
||||
for k2, v2 in v.items()
|
||||
for k2, v2 in v.get("topics", {}).items()
|
||||
}
|
||||
|
||||
flac = await self.config.get("active-flow").get(processor)
|
||||
if flac is not None:
|
||||
target = json.loads(flac)
|
||||
else:
|
||||
target = {}
|
||||
params = {
|
||||
repl_template_with_params(k2): repl_template_with_params(v2)
|
||||
for k2, v2 in v.get("parameters", {}).items()
|
||||
}
|
||||
|
||||
# The condition if variant not in target: means it only adds
|
||||
# the configuration if the variant doesn't already exist.
|
||||
# If "everything" already exists in the target with old
|
||||
# values, they won't update.
|
||||
entry = {
|
||||
"topics": topics,
|
||||
"parameters": params,
|
||||
}
|
||||
|
||||
if variant not in target:
|
||||
target[variant] = v
|
||||
updates.append((
|
||||
f"processor:{processor}",
|
||||
variant,
|
||||
json.dumps(entry),
|
||||
))
|
||||
|
||||
await self.config.get("active-flow").put(
|
||||
processor, json.dumps(target)
|
||||
)
|
||||
await self.config.put_many(updates)
|
||||
|
||||
def repl_interface(i):
|
||||
if isinstance(i, str):
|
||||
return repl_template_with_params(i)
|
||||
else:
|
||||
return {
|
||||
k: repl_template_with_params(v)
|
||||
for k, v in i.items()
|
||||
|
|
@ -258,8 +269,8 @@ class FlowConfig:
|
|||
else:
|
||||
interfaces = {}
|
||||
|
||||
await self.config.get("flow").put(
|
||||
msg.flow_id,
|
||||
await self.config.put(
|
||||
"flow", msg.flow_id,
|
||||
json.dumps({
|
||||
"description": msg.description,
|
||||
"blueprint-name": msg.blueprint_name,
|
||||
|
|
@ -268,23 +279,131 @@ class FlowConfig:
|
|||
})
|
||||
)
|
||||
|
||||
await self.config.inc_version()
|
||||
|
||||
await self.config.push(types=["active-flow", "flow"])
|
||||
|
||||
return FlowResponse(
|
||||
error = None,
|
||||
)
|
||||
|
||||
async def ensure_existing_flow_queues(self):
|
||||
"""Ensure queues exist for all already-running flows.
|
||||
|
||||
Called on startup to handle flows that were started before this
|
||||
version of the flow service was deployed, or before a restart.
|
||||
"""
|
||||
flow_ids = await self.config.keys("flow")
|
||||
|
||||
for flow_id in flow_ids:
|
||||
try:
|
||||
flow_data = await self.config.get("flow", flow_id)
|
||||
if flow_data is None:
|
||||
continue
|
||||
|
||||
flow = json.loads(flow_data)
|
||||
|
||||
blueprint_name = flow.get("blueprint-name")
|
||||
if blueprint_name is None:
|
||||
continue
|
||||
|
||||
# Skip flows that are mid-shutdown
|
||||
if flow.get("status") == "stopping":
|
||||
continue
|
||||
|
||||
parameters = flow.get("parameters", {})
|
||||
|
||||
blueprint_data = await self.config.get(
|
||||
"flow-blueprint", blueprint_name
|
||||
)
|
||||
if blueprint_data is None:
|
||||
logger.warning(
|
||||
f"Blueprint '{blueprint_name}' not found for "
|
||||
f"flow '{flow_id}', skipping queue creation"
|
||||
)
|
||||
continue
|
||||
|
||||
cls = json.loads(blueprint_data)
|
||||
|
||||
def repl_template(tmp):
|
||||
result = tmp.replace(
|
||||
"{blueprint}", blueprint_name
|
||||
).replace(
|
||||
"{id}", flow_id
|
||||
)
|
||||
for param_name, param_value in parameters.items():
|
||||
result = result.replace(
|
||||
f"{{{param_name}}}", str(param_value)
|
||||
)
|
||||
return result
|
||||
|
||||
queues = self._collect_flow_queues(cls, repl_template)
|
||||
for topic, subscription in queues:
|
||||
await self.pubsub.ensure_queue(topic, subscription)
|
||||
|
||||
logger.info(
|
||||
f"Ensured queues for existing flow '{flow_id}'"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to ensure queues for flow '{flow_id}': {e}"
|
||||
)
|
||||
|
||||
def _collect_flow_queues(self, cls, repl_template):
|
||||
"""Collect (topic, subscription) pairs for all flow-level queues.
|
||||
|
||||
Iterates the blueprint's "flow" section and reads only the
|
||||
"topics" dict from each processor entry.
|
||||
"""
|
||||
queues = []
|
||||
|
||||
for k, v in cls["flow"].items():
|
||||
processor, variant = k.split(":", 1)
|
||||
variant = repl_template(variant)
|
||||
|
||||
for spec_name, topic_template in v.get("topics", {}).items():
|
||||
topic = repl_template(topic_template)
|
||||
subscription = f"{processor}--{variant}--{spec_name}"
|
||||
queues.append((topic, subscription))
|
||||
|
||||
return queues
|
||||
|
||||
async def _delete_queues(self, queues):
|
||||
"""Delete queues with retries. Best-effort — logs failures but
|
||||
does not raise."""
|
||||
for attempt in range(DELETE_RETRIES):
|
||||
remaining = []
|
||||
|
||||
for topic, subscription in queues:
|
||||
try:
|
||||
await self.pubsub.delete_queue(topic, subscription)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Queue delete failed (attempt {attempt + 1}/"
|
||||
f"{DELETE_RETRIES}): {topic}: {e}"
|
||||
)
|
||||
remaining.append((topic, subscription))
|
||||
|
||||
if not remaining:
|
||||
return
|
||||
|
||||
queues = remaining
|
||||
|
||||
if attempt < DELETE_RETRIES - 1:
|
||||
await asyncio.sleep(DELETE_RETRY_DELAY)
|
||||
|
||||
for topic, subscription in queues:
|
||||
logger.error(
|
||||
f"Failed to delete queue after {DELETE_RETRIES} "
|
||||
f"attempts: {topic}"
|
||||
)
|
||||
|
||||
async def handle_stop_flow(self, msg):
|
||||
|
||||
if msg.flow_id is None:
|
||||
raise RuntimeError("No flow ID")
|
||||
|
||||
if msg.flow_id not in await self.config.get("flow").keys():
|
||||
if msg.flow_id not in await self.config.keys("flow"):
|
||||
raise RuntimeError("Flow ID invalid")
|
||||
|
||||
flow = json.loads(await self.config.get("flow").get(msg.flow_id))
|
||||
flow = json.loads(await self.config.get("flow", msg.flow_id))
|
||||
|
||||
if "blueprint-name" not in flow:
|
||||
raise RuntimeError("Internal error: flow has no flow blueprint")
|
||||
|
|
@ -292,7 +411,9 @@ class FlowConfig:
|
|||
blueprint_name = flow["blueprint-name"]
|
||||
parameters = flow.get("parameters", {})
|
||||
|
||||
cls = json.loads(await self.config.get("flow-blueprint").get(blueprint_name))
|
||||
cls = json.loads(
|
||||
await self.config.get("flow-blueprint", blueprint_name)
|
||||
)
|
||||
|
||||
def repl_template(tmp):
|
||||
result = tmp.replace(
|
||||
|
|
@ -305,34 +426,33 @@ class FlowConfig:
|
|||
result = result.replace(f"{{{param_name}}}", str(param_value))
|
||||
return result
|
||||
|
||||
for kind in ("flow",):
|
||||
# Collect queue identifiers before removing config
|
||||
queues = self._collect_flow_queues(cls, repl_template)
|
||||
|
||||
for k, v in cls[kind].items():
|
||||
|
||||
processor, variant = k.split(":", 1)
|
||||
|
||||
variant = repl_template(variant)
|
||||
|
||||
flac = await self.config.get("active-flow").get(processor)
|
||||
|
||||
if flac is not None:
|
||||
target = json.loads(flac)
|
||||
else:
|
||||
target = {}
|
||||
|
||||
if variant in target:
|
||||
del target[variant]
|
||||
|
||||
await self.config.get("active-flow").put(
|
||||
processor, json.dumps(target)
|
||||
# Phase 1: Set status to "stopping" and remove processor config.
|
||||
# The config push tells processors to shut down their consumers.
|
||||
flow["status"] = "stopping"
|
||||
await self.config.put(
|
||||
"flow", msg.flow_id, json.dumps(flow)
|
||||
)
|
||||
|
||||
if msg.flow_id in await self.config.get("flow").keys():
|
||||
await self.config.get("flow").delete(msg.flow_id)
|
||||
# Delete all processor config entries for this flow.
|
||||
deletes = []
|
||||
|
||||
await self.config.inc_version()
|
||||
for k, v in cls["flow"].items():
|
||||
|
||||
await self.config.push(types=["active-flow", "flow"])
|
||||
processor, variant = k.split(":", 1)
|
||||
variant = repl_template(variant)
|
||||
|
||||
deletes.append((f"processor:{processor}", variant))
|
||||
|
||||
await self.config.delete_many(deletes)
|
||||
|
||||
# Phase 2: Delete queues with retries, then remove the flow record.
|
||||
await self._delete_queues(queues)
|
||||
|
||||
if msg.flow_id in await self.config.keys("flow"):
|
||||
await self.config.delete("flow", msg.flow_id)
|
||||
|
||||
return FlowResponse(
|
||||
error = None,
|
||||
|
|
@ -368,4 +488,3 @@ class FlowConfig:
|
|||
)
|
||||
|
||||
return resp
|
||||
|
||||
162
trustgraph-flow/trustgraph/flow/service/service.py
Normal file
162
trustgraph-flow/trustgraph/flow/service/service.py
Normal file
|
|
@ -0,0 +1,162 @@
|
|||
|
||||
"""
|
||||
Flow service. Manages flow lifecycle — starting and stopping flows
|
||||
by coordinating with the config service via pub/sub.
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from trustgraph.schema import Error
|
||||
|
||||
from trustgraph.schema import FlowRequest, FlowResponse
|
||||
from trustgraph.schema import flow_request_queue, flow_response_queue
|
||||
from trustgraph.schema import ConfigRequest, ConfigResponse
|
||||
from trustgraph.schema import config_request_queue, config_response_queue
|
||||
|
||||
from trustgraph.base import AsyncProcessor, Consumer, Producer
|
||||
from trustgraph.base import ConsumerMetrics, ProducerMetrics, SubscriberMetrics
|
||||
from trustgraph.base import ConfigClient
|
||||
|
||||
from . flow import FlowConfig
|
||||
|
||||
# Module logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
default_ident = "flow-svc"
|
||||
|
||||
default_flow_request_queue = flow_request_queue
|
||||
default_flow_response_queue = flow_response_queue
|
||||
|
||||
|
||||
class Processor(AsyncProcessor):
|
||||
|
||||
def __init__(self, **params):
|
||||
|
||||
flow_request_queue = params.get(
|
||||
"flow_request_queue", default_flow_request_queue
|
||||
)
|
||||
flow_response_queue = params.get(
|
||||
"flow_response_queue", default_flow_response_queue
|
||||
)
|
||||
|
||||
id = params.get("id")
|
||||
|
||||
super(Processor, self).__init__(
|
||||
**params | {
|
||||
"flow_request_schema": FlowRequest.__name__,
|
||||
"flow_response_schema": FlowResponse.__name__,
|
||||
}
|
||||
)
|
||||
|
||||
flow_request_metrics = ConsumerMetrics(
|
||||
processor = self.id, flow = None, name = "flow-request"
|
||||
)
|
||||
flow_response_metrics = ProducerMetrics(
|
||||
processor = self.id, flow = None, name = "flow-response"
|
||||
)
|
||||
|
||||
self.flow_request_topic = flow_request_queue
|
||||
self.flow_request_subscriber = id
|
||||
|
||||
self.flow_request_consumer = Consumer(
|
||||
taskgroup = self.taskgroup,
|
||||
backend = self.pubsub,
|
||||
flow = None,
|
||||
topic = flow_request_queue,
|
||||
subscriber = id,
|
||||
schema = FlowRequest,
|
||||
handler = self.on_flow_request,
|
||||
metrics = flow_request_metrics,
|
||||
)
|
||||
|
||||
self.flow_response_producer = Producer(
|
||||
backend = self.pubsub,
|
||||
topic = flow_response_queue,
|
||||
schema = FlowResponse,
|
||||
metrics = flow_response_metrics,
|
||||
)
|
||||
|
||||
config_req_metrics = ProducerMetrics(
|
||||
processor=self.id, flow=None, name="config-request",
|
||||
)
|
||||
config_resp_metrics = SubscriberMetrics(
|
||||
processor=self.id, flow=None, name="config-response",
|
||||
)
|
||||
|
||||
self.config_client = ConfigClient(
|
||||
backend=self.pubsub,
|
||||
subscription=f"{self.id}--config--{id}",
|
||||
consumer_name=self.id,
|
||||
request_topic=config_request_queue,
|
||||
request_schema=ConfigRequest,
|
||||
request_metrics=config_req_metrics,
|
||||
response_topic=config_response_queue,
|
||||
response_schema=ConfigResponse,
|
||||
response_metrics=config_resp_metrics,
|
||||
)
|
||||
|
||||
self.flow = FlowConfig(self.config_client, self.pubsub)
|
||||
|
||||
logger.info("Flow service initialized")
|
||||
|
||||
async def start(self):
|
||||
|
||||
await self.pubsub.ensure_queue(
|
||||
self.flow_request_topic, self.flow_request_subscriber
|
||||
)
|
||||
await self.config_client.start()
|
||||
await self.flow.ensure_existing_flow_queues()
|
||||
await self.flow_request_consumer.start()
|
||||
|
||||
async def on_flow_request(self, msg, consumer, flow):
|
||||
|
||||
try:
|
||||
|
||||
v = msg.value()
|
||||
|
||||
# Sender-produced ID
|
||||
id = msg.properties()["id"]
|
||||
|
||||
logger.debug(f"Handling flow request {id}...")
|
||||
|
||||
resp = await self.flow.handle(v)
|
||||
|
||||
await self.flow_response_producer.send(
|
||||
resp, properties={"id": id}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
logger.error(f"Flow request failed: {e}")
|
||||
|
||||
resp = FlowResponse(
|
||||
error=Error(
|
||||
type = "flow-error",
|
||||
message = str(e),
|
||||
),
|
||||
)
|
||||
|
||||
await self.flow_response_producer.send(
|
||||
resp, properties={"id": id}
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
|
||||
AsyncProcessor.add_args(parser)
|
||||
|
||||
parser.add_argument(
|
||||
'--flow-request-queue',
|
||||
default=default_flow_request_queue,
|
||||
help=f'Flow request queue (default: {default_flow_request_queue})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--flow-response-queue',
|
||||
default=default_flow_response_queue,
|
||||
help=f'Flow response queue {default_flow_response_queue}',
|
||||
)
|
||||
|
||||
def run():
|
||||
|
||||
Processor.launch(default_ident, __doc__)
|
||||
|
|
@ -54,7 +54,7 @@ class ConfigReceiver:
|
|||
return
|
||||
|
||||
# Gateway cares about flow config
|
||||
if notify_types and "flow" not in notify_types and "active-flow" not in notify_types:
|
||||
if notify_types and "flow" not in notify_types:
|
||||
logger.debug(
|
||||
f"Ignoring config notify v{notify_version}, "
|
||||
f"no flow types in {notify_types}"
|
||||
|
|
|
|||
|
|
@ -226,7 +226,7 @@ class DispatcherManager:
|
|||
raise RuntimeError("This kind not supported by flow")
|
||||
|
||||
# FIXME: The -store bit, does it make sense?
|
||||
qconfig = intf_defs[int_kind]
|
||||
qconfig = intf_defs[int_kind]["flow"]
|
||||
|
||||
id = str(uuid.uuid4())
|
||||
dispatcher = import_dispatchers[kind](
|
||||
|
|
@ -264,7 +264,7 @@ class DispatcherManager:
|
|||
if int_kind not in intf_defs:
|
||||
raise RuntimeError("This kind not supported by flow")
|
||||
|
||||
qconfig = intf_defs[int_kind]
|
||||
qconfig = intf_defs[int_kind]["flow"]
|
||||
|
||||
id = str(uuid.uuid4())
|
||||
dispatcher = export_dispatchers[kind](
|
||||
|
|
@ -320,7 +320,7 @@ class DispatcherManager:
|
|||
elif kind in sender_dispatchers:
|
||||
dispatcher = sender_dispatchers[kind](
|
||||
backend = self.backend,
|
||||
queue = qconfig,
|
||||
queue = qconfig["flow"],
|
||||
)
|
||||
else:
|
||||
raise RuntimeError("Invalid kind")
|
||||
|
|
|
|||
|
|
@ -162,6 +162,9 @@ class Processor(AsyncProcessor):
|
|||
processor = self.id, flow = None, name = "storage-response"
|
||||
)
|
||||
|
||||
self.librarian_request_topic = librarian_request_queue
|
||||
self.librarian_request_subscriber = id
|
||||
|
||||
self.librarian_request_consumer = Consumer(
|
||||
taskgroup = self.taskgroup,
|
||||
backend = self.pubsub,
|
||||
|
|
@ -180,6 +183,9 @@ class Processor(AsyncProcessor):
|
|||
metrics = librarian_response_metrics,
|
||||
)
|
||||
|
||||
self.collection_request_topic = collection_request_queue
|
||||
self.collection_request_subscriber = id
|
||||
|
||||
self.collection_request_consumer = Consumer(
|
||||
taskgroup = self.taskgroup,
|
||||
backend = self.pubsub,
|
||||
|
|
@ -248,7 +254,7 @@ class Processor(AsyncProcessor):
|
|||
|
||||
self.register_config_handler(
|
||||
self.on_librarian_config,
|
||||
types=["flow", "active-flow"],
|
||||
types=["flow"],
|
||||
)
|
||||
|
||||
self.flows = {}
|
||||
|
|
@ -257,6 +263,12 @@ class Processor(AsyncProcessor):
|
|||
|
||||
async def start(self):
|
||||
|
||||
await self.pubsub.ensure_queue(
|
||||
self.librarian_request_topic, self.librarian_request_subscriber
|
||||
)
|
||||
await self.pubsub.ensure_queue(
|
||||
self.collection_request_topic, self.collection_request_subscriber
|
||||
)
|
||||
await super(Processor, self).start()
|
||||
await self.librarian_request_consumer.start()
|
||||
await self.librarian_response_producer.start()
|
||||
|
|
@ -365,12 +377,12 @@ class Processor(AsyncProcessor):
|
|||
else:
|
||||
kind = "document-load"
|
||||
|
||||
q = flow["interfaces"][kind]
|
||||
q = flow["interfaces"][kind]["flow"]
|
||||
|
||||
# Emit document provenance to knowledge graph
|
||||
if "triples-store" in flow["interfaces"]:
|
||||
await self.emit_document_provenance(
|
||||
document, processing, flow["interfaces"]["triples-store"]
|
||||
document, processing, flow["interfaces"]["triples-store"]["flow"]
|
||||
)
|
||||
|
||||
if kind == "text-load":
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue