Flow class / flow blueprint

This commit is contained in:
Cyber MacGeddon 2026-01-14 11:11:52 +00:00
parent 99f17d1b9d
commit 95fd60b7bd
11 changed files with 118 additions and 116 deletions

View file

@ -63,13 +63,13 @@ class FlowProcessor(AsyncProcessor):
logger.info(f"Got config version {version}")
# Skip over invalid data
if "flows-active" not in config: return
if "active-flow" not in config: return
# Check there's configuration information for me
if self.id in config["flows-active"]:
if self.id in config["active-flow"]:
# Get my flow config
flow_config = json.loads(config["flows-active"][self.id])
flow_config = json.loads(config["active-flow"][self.id])
else:

View file

@ -9,8 +9,8 @@ class FlowRequestTranslator(MessageTranslator):
def to_pulsar(self, data: Dict[str, Any]) -> FlowRequest:
return FlowRequest(
operation=data.get("operation"),
class_name=data.get("class-name"),
class_definition=data.get("class-definition"),
blueprint_name=data.get("blueprint-name"),
blueprint_definition=data.get("blueprint-definition"),
description=data.get("description"),
flow_id=data.get("flow-id"),
parameters=data.get("parameters")
@ -21,10 +21,10 @@ class FlowRequestTranslator(MessageTranslator):
if obj.operation is not None:
result["operation"] = obj.operation
if obj.class_name is not None:
result["class-name"] = obj.class_name
if obj.class_definition is not None:
result["class-definition"] = obj.class_definition
if obj.blueprint_name is not None:
result["blueprint-name"] = obj.blueprint_name
if obj.blueprint_definition is not None:
result["blueprint-definition"] = obj.blueprint_definition
if obj.description is not None:
result["description"] = obj.description
if obj.flow_id is not None:
@ -44,12 +44,12 @@ class FlowResponseTranslator(MessageTranslator):
def from_pulsar(self, obj: FlowResponse) -> Dict[str, Any]:
result = {}
if obj.class_names is not None:
result["class-names"] = obj.class_names
if obj.blueprint_names is not None:
result["blueprint-names"] = obj.blueprint_names
if obj.flow_ids is not None:
result["flow-ids"] = obj.flow_ids
if obj.class_definition is not None:
result["class-definition"] = obj.class_definition
if obj.blueprint_definition is not None:
result["blueprint-definition"] = obj.blueprint_definition
if obj.flow is not None:
result["flow"] = obj.flow
if obj.description is not None:

View file

@ -7,27 +7,27 @@ from ..core.primitives import Error
############################################################################
# Flow service:
# list_classes() -> (classname[])
# get_class(classname) -> (class)
# put_class(class) -> (class)
# delete_class(classname) -> ()
# list_blueprints() -> (blueprintname[])
# get_blueprint(blueprintname) -> (blueprint)
# put_blueprint(blueprint) -> (blueprint)
# delete_blueprint(blueprintname) -> ()
#
# list_flows() -> (flowid[])
# get_flow(flowid) -> (flow)
# start_flow(flowid, classname) -> ()
# start_flow(flowid, blueprintname) -> ()
# stop_flow(flowid) -> ()
# Prompt services, abstract the prompt generation
@dataclass
class FlowRequest:
operation: str = "" # list-classes, get-class, put-class, delete-class
operation: str = "" # list-blueprints, get-blueprint, put-blueprint, delete-blueprint
# list-flows, get-flow, start-flow, stop-flow
# get_class, put_class, delete_class, start_flow
class_name: str = ""
# get_blueprint, put_blueprint, delete_blueprint, start_flow
blueprint_name: str = ""
# put_class
class_definition: str = ""
# put_blueprint
blueprint_definition: str = ""
# start_flow
description: str = ""
@ -40,14 +40,14 @@ class FlowRequest:
@dataclass
class FlowResponse:
# list_classes
class_names: list[str] = field(default_factory=list)
# list_blueprints
blueprint_names: list[str] = field(default_factory=list)
# list_flows
flow_ids: list[str] = field(default_factory=list)
# get_class
class_definition: str = ""
# get_blueprint
blueprint_definition: str = ""
# get_flow
flow: str = ""

View file

@ -29,13 +29,13 @@ Homepage = "https://github.com/trustgraph-ai/trustgraph"
[project.scripts]
tg-add-library-document = "trustgraph.cli.add_library_document:main"
tg-delete-flow-class = "trustgraph.cli.delete_flow_class:main"
tg-delete-flow-blueprint = "trustgraph.cli.delete_flow_blueprint:main"
tg-delete-mcp-tool = "trustgraph.cli.delete_mcp_tool:main"
tg-delete-kg-core = "trustgraph.cli.delete_kg_core:main"
tg-delete-tool = "trustgraph.cli.delete_tool:main"
tg-dump-msgpack = "trustgraph.cli.dump_msgpack:main"
tg-dump-queues = "trustgraph.cli.dump_queues:main"
tg-get-flow-class = "trustgraph.cli.get_flow_class:main"
tg-get-flow-blueprint = "trustgraph.cli.get_flow_blueprint:main"
tg-get-kg-core = "trustgraph.cli.get_kg_core:main"
tg-graph-to-turtle = "trustgraph.cli.graph_to_turtle:main"
tg-init-trustgraph = "trustgraph.cli.init_trustgraph:main"
@ -56,7 +56,7 @@ tg-load-text = "trustgraph.cli.load_text:main"
tg-load-turtle = "trustgraph.cli.load_turtle:main"
tg-load-knowledge = "trustgraph.cli.load_knowledge:main"
tg-load-structured-data = "trustgraph.cli.load_structured_data:main"
tg-put-flow-class = "trustgraph.cli.put_flow_class:main"
tg-put-flow-blueprint = "trustgraph.cli.put_flow_blueprint:main"
tg-put-kg-core = "trustgraph.cli.put_kg_core:main"
tg-remove-library-document = "trustgraph.cli.remove_library_document:main"
tg-save-doc-embeds = "trustgraph.cli.save_doc_embeds:main"
@ -65,7 +65,7 @@ tg-set-prompt = "trustgraph.cli.set_prompt:main"
tg-set-token-costs = "trustgraph.cli.set_token_costs:main"
tg-set-tool = "trustgraph.cli.set_tool:main"
tg-show-config = "trustgraph.cli.show_config:main"
tg-show-flow-classes = "trustgraph.cli.show_flow_classes:main"
tg-show-flow-blueprints = "trustgraph.cli.show_flow_blueprints:main"
tg-show-flow-state = "trustgraph.cli.show_flow_state:main"
tg-show-flows = "trustgraph.cli.show_flows:main"
tg-show-graph = "trustgraph.cli.show_graph:main"

View file

@ -1,5 +1,5 @@
"""
Deletes a flow class
Deletes a flow blueprint
"""
import argparse
@ -10,16 +10,16 @@ import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def delete_flow_class(url, class_name):
def delete_flow_blueprint(url, blueprint_name):
api = Api(url).flow()
class_names = api.delete_class(class_name)
blueprint_names = api.delete_blueprint(blueprint_name)
def main():
parser = argparse.ArgumentParser(
prog='tg-delete-flow-class',
prog='tg-delete-flow-blueprint',
description=__doc__,
)
@ -30,17 +30,17 @@ def main():
)
parser.add_argument(
'-n', '--class-name',
help=f'Flow class name',
'-n', '--blueprint-name',
help=f'Flow blueprint name',
)
args = parser.parse_args()
try:
delete_flow_class(
delete_flow_blueprint(
url=args.api_url,
class_name=args.class_name,
blueprint_name=args.blueprint_name,
)
except Exception as e:
@ -48,4 +48,4 @@ def main():
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()
main()

View file

@ -1,5 +1,5 @@
"""
Outputs a flow class definition in JSON format.
Outputs a flow blueprint definition in JSON format.
"""
import argparse
@ -10,18 +10,18 @@ import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def get_flow_class(url, class_name):
def get_flow_blueprint(url, blueprint_name):
api = Api(url).flow()
cls = api.get_class(class_name)
cls = api.get_blueprint(blueprint_name)
print(json.dumps(cls, indent=4))
def main():
parser = argparse.ArgumentParser(
prog='tg-get-flow-class',
prog='tg-get-flow-blueprint',
description=__doc__,
)
@ -32,18 +32,18 @@ def main():
)
parser.add_argument(
'-n', '--class-name',
'-n', '--blueprint-name',
required=True,
help=f'Flow class name',
help=f'Flow blueprint name',
)
args = parser.parse_args()
try:
get_flow_class(
get_flow_blueprint(
url=args.api_url,
class_name=args.class_name,
blueprint_name=args.blueprint_name,
)
except Exception as e:
@ -51,4 +51,4 @@ def main():
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()
main()

View file

@ -1,6 +1,6 @@
"""
Uploads a flow class definition. You can take the output of
tg-get-flow-class and load it back in using this utility.
Uploads a flow blueprint definition. You can take the output of
tg-get-flow-blueprint and load it back in using this utility.
"""
import argparse
@ -11,16 +11,16 @@ import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_token = os.getenv("TRUSTGRAPH_TOKEN", None)
def put_flow_class(url, class_name, config, token=None):
def put_flow_blueprint(url, blueprint_name, config, token=None):
api = Api(url, token=token)
class_names = api.flow().put_class(class_name, config)
blueprint_names = api.flow().put_blueprint(blueprint_name, config)
def main():
parser = argparse.ArgumentParser(
prog='tg-put-flow-class',
prog='tg-put-flow-blueprint',
description=__doc__,
)
@ -37,8 +37,8 @@ def main():
)
parser.add_argument(
'-n', '--class-name',
help=f'Flow class name',
'-n', '--blueprint-name',
help=f'Flow blueprint name',
)
parser.add_argument(
@ -50,9 +50,9 @@ def main():
try:
put_flow_class(
put_flow_blueprint(
url=args.api_url,
class_name=args.class_name,
blueprint_name=args.blueprint_name,
config=json.loads(args.config),
token=args.token,
)
@ -62,4 +62,4 @@ def main():
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()
main()

View file

@ -1,5 +1,5 @@
"""
Shows all defined flow classes.
Shows all defined flow blueprints.
"""
import argparse
@ -16,7 +16,7 @@ def format_parameters(params_metadata, config_api):
Format parameter metadata for display
Args:
params_metadata: Parameter definitions from flow class
params_metadata: Parameter definitions from flow blueprint
config_api: API client to get parameter type information
Returns:
@ -58,23 +58,23 @@ def format_parameters(params_metadata, config_api):
return "\n".join(param_list)
def show_flow_classes(url, token=None):
def show_flow_blueprints(url, token=None):
api = Api(url, token=token)
flow_api = api.flow()
config_api = api.config()
class_names = flow_api.list_classes()
blueprint_names = flow_api.list_blueprints()
if len(class_names) == 0:
print("No flow classes.")
if len(blueprint_names) == 0:
print("No flow blueprints.")
return
for class_name in class_names:
cls = flow_api.get_class(class_name)
for blueprint_name in blueprint_names:
cls = flow_api.get_blueprint(blueprint_name)
table = []
table.append(("name", class_name))
table.append(("name", blueprint_name))
table.append(("description", cls.get("description", "")))
tags = cls.get("tags", [])
@ -97,7 +97,7 @@ def show_flow_classes(url, token=None):
def main():
parser = argparse.ArgumentParser(
prog='tg-show-flow-classes',
prog='tg-show-flow-blueprints',
description=__doc__,
)
@ -117,7 +117,7 @@ def main():
try:
show_flow_classes(
show_flow_blueprints(
url=args.api_url,
token=args.token,
)
@ -127,4 +127,4 @@ def main():
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()
main()

View file

@ -32,7 +32,9 @@ class Service(ToolService):
logger.info(f"Got config version {version}")
if "mcp" not in config: return
if "mcp" not in config:
self.mcp_services = {}
return
self.mcp_services = {
k: json.loads(v)

View file

@ -13,26 +13,26 @@ class FlowConfig:
# Cache for parameter type definitions to avoid repeated lookups
self.param_type_cache = {}
async def resolve_parameters(self, flow_class, user_params):
async def resolve_parameters(self, flow_blueprint, user_params):
"""
Resolve parameters by merging user-provided values with defaults.
Args:
flow_class: The flow class definition dict
flow_blueprint: The flow blueprint definition dict
user_params: User-provided parameters dict (may be None or empty)
Returns:
Complete parameter dict with user values and defaults merged (all values as strings)
"""
# If the flow class has no parameters section, return user params as-is (stringified)
if "parameters" not in flow_class:
# If the flow blueprint has no parameters section, return user params as-is (stringified)
if "parameters" not in flow_blueprint:
if not user_params:
return {}
# Ensure all values are strings
return {k: str(v) for k, v in user_params.items()}
resolved = {}
flow_params = flow_class["parameters"]
flow_params = flow_blueprint["parameters"]
user_params = user_params if user_params else {}
# First pass: resolve parameters with explicit values or defaults
@ -92,7 +92,7 @@ class FlowConfig:
else:
resolved[param_name] = str(default_value)
# Include any extra parameters from user that weren't in flow class definition
# Include any extra parameters from user that weren't in flow blueprint definition
# This allows for forward compatibility (ensure they're strings)
for key, value in user_params.items():
if key not in resolved:
@ -100,28 +100,28 @@ class FlowConfig:
return resolved
async def handle_list_classes(self, msg):
async def handle_list_blueprints(self, msg):
names = list(await self.config.get("flow-classes").keys())
names = list(await self.config.get("flow-blueprints").keys())
return FlowResponse(
error = None,
class_names = names,
blueprint_names = names,
)
async def handle_get_class(self, msg):
async def handle_get_blueprint(self, msg):
return FlowResponse(
error = None,
class_definition = await self.config.get(
"flow-classes"
).get(msg.class_name),
blueprint_definition = await self.config.get(
"flow-blueprints"
).get(msg.blueprint_name),
)
async def handle_put_class(self, msg):
async def handle_put_blueprint(self, msg):
await self.config.get("flow-classes").put(
msg.class_name, msg.class_definition
await self.config.get("flow-blueprints").put(
msg.blueprint_name, msg.blueprint_definition
)
await self.config.inc_version()
@ -132,11 +132,11 @@ class FlowConfig:
error = None,
)
async def handle_delete_class(self, msg):
async def handle_delete_blueprint(self, msg):
logger.debug(f"Flow config message: {msg}")
await self.config.get("flow-classes").delete(msg.class_name)
await self.config.get("flow-blueprints").delete(msg.blueprint_name)
await self.config.inc_version()
@ -169,8 +169,8 @@ class FlowConfig:
async def handle_start_flow(self, msg):
if msg.class_name is None:
raise RuntimeError("No class name")
if msg.blueprint_name is None:
raise RuntimeError("No blueprint name")
if msg.flow_id is None:
raise RuntimeError("No flow ID")
@ -181,11 +181,11 @@ class FlowConfig:
if msg.description is None:
raise RuntimeError("No description")
if msg.class_name not in await self.config.get("flow-classes").keys():
raise RuntimeError("Class does not exist")
if msg.blueprint_name not in await self.config.get("flow-blueprints").keys():
raise RuntimeError("Blueprint does not exist")
cls = json.loads(
await self.config.get("flow-classes").get(msg.class_name)
await self.config.get("flow-blueprints").get(msg.blueprint_name)
)
# Resolve parameters by merging user-provided values with defaults
@ -200,7 +200,7 @@ class FlowConfig:
def repl_template_with_params(tmp):
result = tmp.replace(
"{class}", msg.class_name
"{blueprint}", msg.blueprint_name
).replace(
"{id}", msg.flow_id
)
@ -210,7 +210,7 @@ class FlowConfig:
return result
for kind in ("class", "flow"):
for kind in ("blueprint", "flow"):
for k, v in cls[kind].items():
@ -223,7 +223,7 @@ class FlowConfig:
for k2, v2 in v.items()
}
flac = await self.config.get("flows-active").get(processor)
flac = await self.config.get("active-flow").get(processor)
if flac is not None:
target = json.loads(flac)
else:
@ -237,7 +237,7 @@ class FlowConfig:
if variant not in target:
target[variant] = v
await self.config.get("flows-active").put(
await self.config.get("active-flow").put(
processor, json.dumps(target)
)
@ -262,7 +262,7 @@ class FlowConfig:
msg.flow_id,
json.dumps({
"description": msg.description,
"class-name": msg.class_name,
"blueprint-name": msg.blueprint_name,
"interfaces": interfaces,
"parameters": parameters,
})
@ -286,17 +286,17 @@ class FlowConfig:
flow = json.loads(await self.config.get("flows").get(msg.flow_id))
if "class-name" not in flow:
raise RuntimeError("Internal error: flow has no flow class")
if "blueprint-name" not in flow:
raise RuntimeError("Internal error: flow has no flow blueprint")
class_name = flow["class-name"]
blueprint_name = flow["blueprint-name"]
parameters = flow.get("parameters", {})
cls = json.loads(await self.config.get("flow-classes").get(class_name))
cls = json.loads(await self.config.get("flow-blueprints").get(blueprint_name))
def repl_template(tmp):
result = tmp.replace(
"{class}", class_name
"{blueprint}", blueprint_name
).replace(
"{id}", msg.flow_id
)
@ -313,7 +313,7 @@ class FlowConfig:
variant = repl_template(variant)
flac = await self.config.get("flows-active").get(processor)
flac = await self.config.get("active-flow").get(processor)
if flac is not None:
target = json.loads(flac)
@ -323,7 +323,7 @@ class FlowConfig:
if variant in target:
del target[variant]
await self.config.get("flows-active").put(
await self.config.get("active-flow").put(
processor, json.dumps(target)
)
@ -342,14 +342,14 @@ class FlowConfig:
logger.debug(f"Handling flow message: {msg.operation}")
if msg.operation == "list-classes":
resp = await self.handle_list_classes(msg)
elif msg.operation == "get-class":
resp = await self.handle_get_class(msg)
elif msg.operation == "put-class":
resp = await self.handle_put_class(msg)
elif msg.operation == "delete-class":
resp = await self.handle_delete_class(msg)
if msg.operation == "list-blueprints":
resp = await self.handle_list_blueprints(msg)
elif msg.operation == "get-blueprint":
resp = await self.handle_get_blueprint(msg)
elif msg.operation == "put-blueprint":
resp = await self.handle_put_blueprint(msg)
elif msg.operation == "delete-blueprint":
resp = await self.handle_delete_blueprint(msg)
elif msg.operation == "list-flows":
resp = await self.handle_list_flows(msg)
elif msg.operation == "get-flow":

View file

@ -52,7 +52,7 @@ class Processor(FlowProcessor):
self.prices = {}
self.config_key = "token-costs"
self.config_key = "token-cost"
# Load token costs from the config service
async def on_cost_config(self, config, version):