diff --git a/tests/unit/test_gateway/test_config_receiver.py b/tests/unit/test_gateway/test_config_receiver.py index ee500766..803ff4c6 100644 --- a/tests/unit/test_gateway/test_config_receiver.py +++ b/tests/unit/test_gateway/test_config_receiver.py @@ -64,7 +64,7 @@ class TestConfigReceiver: mock_msg.value.return_value = Mock( version="1.0", config={ - "flows": { + "flow": { "flow1": '{"name": "test_flow_1", "steps": []}', "flow2": '{"name": "test_flow_2", "steps": []}' } @@ -109,7 +109,7 @@ class TestConfigReceiver: mock_msg.value.return_value = Mock( version="1.0", config={ - "flows": { + "flow": { "flow1": '{"name": "test_flow_1", "steps": []}' } } @@ -352,7 +352,7 @@ class TestConfigReceiver: mock_msg.value.return_value = Mock( version="1.0", config={ - "flows": { + "flow": { "flow2": '{"name": "test_flow_2", "steps": []}', "flow3": '{"name": "test_flow_3", "steps": []}' } @@ -393,7 +393,7 @@ class TestConfigReceiver: mock_msg.value.return_value = Mock( version="1.0", config={ - "flows": { + "flow": { "flow1": '{"invalid": json}', # Invalid JSON "flow2": '{"name": "valid_flow", "steps": []}' # Valid JSON } diff --git a/trustgraph-flow/trustgraph/gateway/config/receiver.py b/trustgraph-flow/trustgraph/gateway/config/receiver.py index bdd123a9..4bf39ccd 100755 --- a/trustgraph-flow/trustgraph/gateway/config/receiver.py +++ b/trustgraph-flow/trustgraph/gateway/config/receiver.py @@ -53,22 +53,20 @@ class ConfigReceiver: logger.info(f"Config version: {v.version}") - if "flows" in v.config: + flows = v.config.get("flow", {}) - flows = v.config["flows"] + wanted = list(flows.keys()) + current = list(self.flows.keys()) - wanted = list(flows.keys()) - current = list(self.flows.keys()) + for k in wanted: + if k not in current: + self.flows[k] = json.loads(flows[k]) + await self.start_flow(k, self.flows[k]) - for k in wanted: - if k not in current: - self.flows[k] = json.loads(flows[k]) - await self.start_flow(k, self.flows[k]) - - for k in current: - if k not in wanted: - await self.stop_flow(k, self.flows[k]) - del self.flows[k] + for k in current: + if k not in wanted: + await self.stop_flow(k, self.flows[k]) + del self.flows[k] except Exception as e: logger.error(f"Config processing exception: {e}", exc_info=True)