diff --git a/trustgraph-base/trustgraph/base/async_processor.py b/trustgraph-base/trustgraph/base/async_processor.py index bdf9a0bb..ba1d4e1a 100644 --- a/trustgraph-base/trustgraph/base/async_processor.py +++ b/trustgraph-base/trustgraph/base/async_processor.py @@ -114,7 +114,7 @@ class AsyncProcessor: version = message.value().version # Invoke message handlers - print("Config change event", config, version, flush=True) + print("Config change event", version, flush=True) for ch in self.config_handlers: await ch(config, version) diff --git a/trustgraph-base/trustgraph/base/consumer_spec.py b/trustgraph-base/trustgraph/base/consumer_spec.py index 21497dc5..93665476 100644 --- a/trustgraph-base/trustgraph/base/consumer_spec.py +++ b/trustgraph-base/trustgraph/base/consumer_spec.py @@ -20,7 +20,7 @@ class ConsumerSpec(Spec): flow = flow, client = processor.pulsar_client, topic = definition[self.name], - subscriber = processor.id + "--" + self.name, + subscriber = processor.id + "--" + flow.name + "--" + self.name, schema = self.schema, handler = self.handler, metrics = consumer_metrics, diff --git a/trustgraph-base/trustgraph/base/flow_processor.py b/trustgraph-base/trustgraph/base/flow_processor.py index e6460fe3..fdeb5950 100644 --- a/trustgraph-base/trustgraph/base/flow_processor.py +++ b/trustgraph-base/trustgraph/base/flow_processor.py @@ -75,7 +75,8 @@ class FlowProcessor(AsyncProcessor): # Get list of flows which should be running and are currently # running wanted_flows = flow_config.keys() - current_flows = self.flows.keys() + # This takes a copy, needed because dict gets modified by stop_flow + current_flows = list(self.flows.keys()) # Start all the flows which arent currently running for flow in wanted_flows: diff --git a/trustgraph-base/trustgraph/base/request_response_spec.py b/trustgraph-base/trustgraph/base/request_response_spec.py index 88ee4563..7b8b1be8 100644 --- a/trustgraph-base/trustgraph/base/request_response_spec.py +++ b/trustgraph-base/trustgraph/base/request_response_spec.py @@ -127,7 +127,9 @@ class RequestResponseSpec(Spec): rr = self.impl( client = processor.pulsar_client, - subscription = flow.id, + subscription = ( + processor.id + "--" + flow.name + "--" + self.request_name + ), consumer_name = flow.id, request_topic = definition[self.request_name], request_schema = self.request_schema, diff --git a/trustgraph-base/trustgraph/base/subscriber.py b/trustgraph-base/trustgraph/base/subscriber.py index 1cf263d4..127d2add 100644 --- a/trustgraph-base/trustgraph/base/subscriber.py +++ b/trustgraph-base/trustgraph/base/subscriber.py @@ -20,7 +20,7 @@ class Subscriber: self.running = True self.metrics = metrics - async def __del__(self): + def __del__(self): self.running = False async def start(self): diff --git a/trustgraph-flow/trustgraph/config/service/flow.py b/trustgraph-flow/trustgraph/config/service/flow.py index 3933e4aa..4d351f59 100644 --- a/trustgraph-flow/trustgraph/config/service/flow.py +++ b/trustgraph-flow/trustgraph/config/service/flow.py @@ -27,6 +27,8 @@ class FlowConfig: self.config["flow-classes"][msg.class_name] = msg.class_definition + self.config.version += 1 + await self.config.push() return FlowResponse( @@ -39,6 +41,8 @@ class FlowConfig: del self.config["flow-classes"][msg.class_name] + self.config.version += 1 + await self.config.push() return FlowResponse( @@ -135,6 +139,8 @@ class FlowConfig: "interfaces": interfaces, }) + self.config.version += 1 + await self.config.push() return FlowResponse( @@ -186,6 +192,8 @@ class FlowConfig: if msg.flow_id in self.config["flows"]: del self.config["flows"][msg.flow_id] + self.config.version += 1 + await self.config.push() return FlowResponse( diff --git a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py index ac2929a3..63670a7d 100755 --- a/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/relationships/extract.py @@ -173,7 +173,7 @@ class Processor(FlowProcessor): o=Value(value=v.metadata.id, is_uri=True) )) - await self.emit_edges( + await self.emit_triples( flow("triples"), Metadata( id=v.metadata.id, diff --git a/trustgraph-flow/trustgraph/metering/counter.py b/trustgraph-flow/trustgraph/metering/counter.py index 12d5683e..cb57d8af 100644 --- a/trustgraph-flow/trustgraph/metering/counter.py +++ b/trustgraph-flow/trustgraph/metering/counter.py @@ -72,7 +72,7 @@ class Processor(FlowProcessor): for k, v in config.items() } - def get_prices(self, prices, modelname): + def get_prices(self, modelname): if modelname in self.prices: model = self.prices[modelname] @@ -90,9 +90,7 @@ class Processor(FlowProcessor): __class__.input_token_metric.inc(num_in) __class__.output_token_metric.inc(num_out) - model_input_price, model_output_price = self.get_prices( - price_list, modelname - ) + model_input_price, model_output_price = self.get_prices(modelname) if model_input_price == None: cost_per_call = f"Model Not Found in Price list"