mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-25 00:16:23 +02:00
Fix/get multiple flows working (#355)
* Reduce log output * Fix problems
This commit is contained in:
parent
9508ac6c69
commit
027b52cd7c
8 changed files with 19 additions and 10 deletions
|
|
@ -114,7 +114,7 @@ class AsyncProcessor:
|
|||
version = message.value().version
|
||||
|
||||
# Invoke message handlers
|
||||
print("Config change event", config, version, flush=True)
|
||||
print("Config change event", version, flush=True)
|
||||
for ch in self.config_handlers:
|
||||
await ch(config, version)
|
||||
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ class ConsumerSpec(Spec):
|
|||
flow = flow,
|
||||
client = processor.pulsar_client,
|
||||
topic = definition[self.name],
|
||||
subscriber = processor.id + "--" + self.name,
|
||||
subscriber = processor.id + "--" + flow.name + "--" + self.name,
|
||||
schema = self.schema,
|
||||
handler = self.handler,
|
||||
metrics = consumer_metrics,
|
||||
|
|
|
|||
|
|
@ -75,7 +75,8 @@ class FlowProcessor(AsyncProcessor):
|
|||
# Get list of flows which should be running and are currently
|
||||
# running
|
||||
wanted_flows = flow_config.keys()
|
||||
current_flows = self.flows.keys()
|
||||
# This takes a copy, needed because dict gets modified by stop_flow
|
||||
current_flows = list(self.flows.keys())
|
||||
|
||||
# Start all the flows which arent currently running
|
||||
for flow in wanted_flows:
|
||||
|
|
|
|||
|
|
@ -127,7 +127,9 @@ class RequestResponseSpec(Spec):
|
|||
|
||||
rr = self.impl(
|
||||
client = processor.pulsar_client,
|
||||
subscription = flow.id,
|
||||
subscription = (
|
||||
processor.id + "--" + flow.name + "--" + self.request_name
|
||||
),
|
||||
consumer_name = flow.id,
|
||||
request_topic = definition[self.request_name],
|
||||
request_schema = self.request_schema,
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ class Subscriber:
|
|||
self.running = True
|
||||
self.metrics = metrics
|
||||
|
||||
async def __del__(self):
|
||||
def __del__(self):
|
||||
self.running = False
|
||||
|
||||
async def start(self):
|
||||
|
|
|
|||
|
|
@ -27,6 +27,8 @@ class FlowConfig:
|
|||
|
||||
self.config["flow-classes"][msg.class_name] = msg.class_definition
|
||||
|
||||
self.config.version += 1
|
||||
|
||||
await self.config.push()
|
||||
|
||||
return FlowResponse(
|
||||
|
|
@ -39,6 +41,8 @@ class FlowConfig:
|
|||
|
||||
del self.config["flow-classes"][msg.class_name]
|
||||
|
||||
self.config.version += 1
|
||||
|
||||
await self.config.push()
|
||||
|
||||
return FlowResponse(
|
||||
|
|
@ -135,6 +139,8 @@ class FlowConfig:
|
|||
"interfaces": interfaces,
|
||||
})
|
||||
|
||||
self.config.version += 1
|
||||
|
||||
await self.config.push()
|
||||
|
||||
return FlowResponse(
|
||||
|
|
@ -186,6 +192,8 @@ class FlowConfig:
|
|||
if msg.flow_id in self.config["flows"]:
|
||||
del self.config["flows"][msg.flow_id]
|
||||
|
||||
self.config.version += 1
|
||||
|
||||
await self.config.push()
|
||||
|
||||
return FlowResponse(
|
||||
|
|
|
|||
|
|
@ -173,7 +173,7 @@ class Processor(FlowProcessor):
|
|||
o=Value(value=v.metadata.id, is_uri=True)
|
||||
))
|
||||
|
||||
await self.emit_edges(
|
||||
await self.emit_triples(
|
||||
flow("triples"),
|
||||
Metadata(
|
||||
id=v.metadata.id,
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ class Processor(FlowProcessor):
|
|||
for k, v in config.items()
|
||||
}
|
||||
|
||||
def get_prices(self, prices, modelname):
|
||||
def get_prices(self, modelname):
|
||||
|
||||
if modelname in self.prices:
|
||||
model = self.prices[modelname]
|
||||
|
|
@ -90,9 +90,7 @@ class Processor(FlowProcessor):
|
|||
__class__.input_token_metric.inc(num_in)
|
||||
__class__.output_token_metric.inc(num_out)
|
||||
|
||||
model_input_price, model_output_price = self.get_prices(
|
||||
price_list, modelname
|
||||
)
|
||||
model_input_price, model_output_price = self.get_prices(modelname)
|
||||
|
||||
if model_input_price == None:
|
||||
cost_per_call = f"Model Not Found in Price list"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue