mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-26 00:46:22 +02:00
feat: separate flow service from config service with explicit queue
lifecycle management
The flow service is now an independent service that owns the lifecycle
of flow and blueprint queues. System services own their own queues.
Consumers never create queues.
Flow service separation:
- New service at trustgraph-flow/trustgraph/flow/service/
- Uses async ConfigClient (RequestResponse pattern) to talk to config
service
- Config service stripped of all flow handling
Queue lifecycle management:
- PubSubBackend protocol gains create_queue, delete_queue,
queue_exists, ensure_queue — all async
- RabbitMQ: implements via pika with asyncio.to_thread internally
- Pulsar: stubs for future admin REST API implementation
- Consumer _connect() no longer creates queues (passive=True for named
queues)
- System services call ensure_queue on startup
- Flow service creates queues on flow start, deletes on flow stop
- Flow service ensures queues for pre-existing flows on startup
Two-phase flow stop:
- Phase 1: set flow status to "stopping", delete processor config
entries
- Phase 2: retry queue deletion, then delete flow record
Config restructure:
- active-flow config replaced with processor:{name} types
- Each processor has its own config type, each flow variant is a key
- Flow start/stop use batch put/delete — single config push per
operation
- FlowProcessor subscribes to its own type only
Blueprint format:
- Processor entries split into topics and parameters dicts
- Flow interfaces use {"flow": "topic"} instead of bare strings
- Specs (ConsumerSpec, ProducerSpec, etc.) read from
definition["topics"]
Tests updated
80 lines
2.6 KiB
Python
80 lines
2.6 KiB
Python
import asyncio
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
from trustgraph.base.flow import Flow
|
|
from trustgraph.base.parameter_spec import Parameter, ParameterSpec
|
|
from trustgraph.base.spec import Spec
|
|
|
|
|
|
def test_parameter_spec_is_a_spec_and_adds_parameter_value():
|
|
spec = ParameterSpec("temperature")
|
|
flow = MagicMock(parameter={})
|
|
processor = MagicMock()
|
|
|
|
spec.add(flow, processor, {"parameters": {"temperature": 0.7}})
|
|
|
|
assert isinstance(spec, Spec)
|
|
assert "temperature" in flow.parameter
|
|
assert isinstance(flow.parameter["temperature"], Parameter)
|
|
assert flow.parameter["temperature"].value == 0.7
|
|
|
|
|
|
def test_parameter_spec_defaults_missing_values_to_none():
|
|
spec = ParameterSpec("model")
|
|
flow = MagicMock(parameter={})
|
|
|
|
spec.add(flow, MagicMock(), {})
|
|
|
|
assert flow.parameter["model"].value is None
|
|
|
|
|
|
def test_parameter_start_and_stop_are_awaitable():
|
|
parameter = Parameter("value")
|
|
|
|
assert asyncio.run(parameter.start()) is None
|
|
assert asyncio.run(parameter.stop()) is None
|
|
|
|
|
|
def test_flow_initialization_calls_registered_specs():
|
|
spec_one = MagicMock()
|
|
spec_two = MagicMock()
|
|
processor = MagicMock(specifications=[spec_one, spec_two])
|
|
|
|
flow = Flow("processor-1", "flow-a", processor, {"answer": 42})
|
|
|
|
assert flow.id == "processor-1"
|
|
assert flow.name == "flow-a"
|
|
assert flow.producer == {}
|
|
assert flow.consumer == {}
|
|
assert flow.parameter == {}
|
|
spec_one.add.assert_called_once_with(flow, processor, {"answer": 42})
|
|
spec_two.add.assert_called_once_with(flow, processor, {"answer": 42})
|
|
|
|
|
|
def test_flow_start_and_stop_visit_all_consumers():
|
|
consumer_one = AsyncMock()
|
|
consumer_two = AsyncMock()
|
|
flow = Flow("processor-1", "flow-a", MagicMock(specifications=[]), {})
|
|
flow.consumer = {"one": consumer_one, "two": consumer_two}
|
|
|
|
asyncio.run(flow.start())
|
|
asyncio.run(flow.stop())
|
|
|
|
consumer_one.start.assert_called_once_with()
|
|
consumer_two.start.assert_called_once_with()
|
|
consumer_one.stop.assert_called_once_with()
|
|
consumer_two.stop.assert_called_once_with()
|
|
|
|
|
|
def test_flow_call_returns_values_in_priority_order():
|
|
flow = Flow("processor-1", "flow-a", MagicMock(specifications=[]), {})
|
|
flow.producer["shared"] = "producer-value"
|
|
flow.consumer["consumer-only"] = "consumer-value"
|
|
flow.consumer["shared"] = "consumer-value"
|
|
flow.parameter["parameter-only"] = Parameter("parameter-value")
|
|
flow.parameter["shared"] = Parameter("parameter-value")
|
|
|
|
assert flow("shared") == "producer-value"
|
|
assert flow("consumer-only") == "consumer-value"
|
|
assert flow("parameter-only") == "parameter-value"
|
|
assert flow("missing") is None
|