From 95fd60b7bd5fdbae1b738c0eed332b7513a4042e Mon Sep 17 00:00:00 2001 From: Cyber MacGeddon Date: Wed, 14 Jan 2026 11:11:52 +0000 Subject: [PATCH] Flow class / flow blueprint --- .../trustgraph/base/flow_processor.py | 6 +- .../trustgraph/messaging/translators/flow.py | 20 ++--- .../trustgraph/schema/services/flow.py | 28 +++--- trustgraph-cli/pyproject.toml | 8 +- ...flow_class.py => delete_flow_blueprint.py} | 18 ++-- ...et_flow_class.py => get_flow_blueprint.py} | 18 ++-- ...ut_flow_class.py => put_flow_blueprint.py} | 20 ++--- ...low_classes.py => show_flow_blueprints.py} | 24 +++--- .../trustgraph/agent/mcp_tool/service.py | 4 +- .../trustgraph/config/service/flow.py | 86 +++++++++---------- .../trustgraph/metering/counter.py | 2 +- 11 files changed, 118 insertions(+), 116 deletions(-) rename trustgraph-cli/trustgraph/cli/{delete_flow_class.py => delete_flow_blueprint.py} (65%) rename trustgraph-cli/trustgraph/cli/{get_flow_class.py => get_flow_blueprint.py} (67%) rename trustgraph-cli/trustgraph/cli/{put_flow_class.py => put_flow_blueprint.py} (69%) rename trustgraph-cli/trustgraph/cli/{show_flow_classes.py => show_flow_blueprints.py} (85%) diff --git a/trustgraph-base/trustgraph/base/flow_processor.py b/trustgraph-base/trustgraph/base/flow_processor.py index 385e1346..0f170030 100644 --- a/trustgraph-base/trustgraph/base/flow_processor.py +++ b/trustgraph-base/trustgraph/base/flow_processor.py @@ -63,13 +63,13 @@ class FlowProcessor(AsyncProcessor): logger.info(f"Got config version {version}") # Skip over invalid data - if "flows-active" not in config: return + if "active-flow" not in config: return # Check there's configuration information for me - if self.id in config["flows-active"]: + if self.id in config["active-flow"]: # Get my flow config - flow_config = json.loads(config["flows-active"][self.id]) + flow_config = json.loads(config["active-flow"][self.id]) else: diff --git a/trustgraph-base/trustgraph/messaging/translators/flow.py b/trustgraph-base/trustgraph/messaging/translators/flow.py index 8c1a019a..542b65ec 100644 --- a/trustgraph-base/trustgraph/messaging/translators/flow.py +++ b/trustgraph-base/trustgraph/messaging/translators/flow.py @@ -9,8 +9,8 @@ class FlowRequestTranslator(MessageTranslator): def to_pulsar(self, data: Dict[str, Any]) -> FlowRequest: return FlowRequest( operation=data.get("operation"), - class_name=data.get("class-name"), - class_definition=data.get("class-definition"), + blueprint_name=data.get("blueprint-name"), + blueprint_definition=data.get("blueprint-definition"), description=data.get("description"), flow_id=data.get("flow-id"), parameters=data.get("parameters") @@ -21,10 +21,10 @@ class FlowRequestTranslator(MessageTranslator): if obj.operation is not None: result["operation"] = obj.operation - if obj.class_name is not None: - result["class-name"] = obj.class_name - if obj.class_definition is not None: - result["class-definition"] = obj.class_definition + if obj.blueprint_name is not None: + result["blueprint-name"] = obj.blueprint_name + if obj.blueprint_definition is not None: + result["blueprint-definition"] = obj.blueprint_definition if obj.description is not None: result["description"] = obj.description if obj.flow_id is not None: @@ -44,12 +44,12 @@ class FlowResponseTranslator(MessageTranslator): def from_pulsar(self, obj: FlowResponse) -> Dict[str, Any]: result = {} - if obj.class_names is not None: - result["class-names"] = obj.class_names + if obj.blueprint_names is not None: + result["blueprint-names"] = obj.blueprint_names if obj.flow_ids is not None: result["flow-ids"] = obj.flow_ids - if obj.class_definition is not None: - result["class-definition"] = obj.class_definition + if obj.blueprint_definition is not None: + result["blueprint-definition"] = obj.blueprint_definition if obj.flow is not None: result["flow"] = obj.flow if obj.description is not None: diff --git a/trustgraph-base/trustgraph/schema/services/flow.py b/trustgraph-base/trustgraph/schema/services/flow.py index b993b1b3..cf62c84d 100644 --- a/trustgraph-base/trustgraph/schema/services/flow.py +++ b/trustgraph-base/trustgraph/schema/services/flow.py @@ -7,27 +7,27 @@ from ..core.primitives import Error ############################################################################ # Flow service: -# list_classes() -> (classname[]) -# get_class(classname) -> (class) -# put_class(class) -> (class) -# delete_class(classname) -> () +# list_blueprints() -> (blueprintname[]) +# get_blueprint(blueprintname) -> (blueprint) +# put_blueprint(blueprint) -> (blueprint) +# delete_blueprint(blueprintname) -> () # # list_flows() -> (flowid[]) # get_flow(flowid) -> (flow) -# start_flow(flowid, classname) -> () +# start_flow(flowid, blueprintname) -> () # stop_flow(flowid) -> () # Prompt services, abstract the prompt generation @dataclass class FlowRequest: - operation: str = "" # list-classes, get-class, put-class, delete-class + operation: str = "" # list-blueprints, get-blueprint, put-blueprint, delete-blueprint # list-flows, get-flow, start-flow, stop-flow - # get_class, put_class, delete_class, start_flow - class_name: str = "" + # get_blueprint, put_blueprint, delete_blueprint, start_flow + blueprint_name: str = "" - # put_class - class_definition: str = "" + # put_blueprint + blueprint_definition: str = "" # start_flow description: str = "" @@ -40,14 +40,14 @@ class FlowRequest: @dataclass class FlowResponse: - # list_classes - class_names: list[str] = field(default_factory=list) + # list_blueprints + blueprint_names: list[str] = field(default_factory=list) # list_flows flow_ids: list[str] = field(default_factory=list) - # get_class - class_definition: str = "" + # get_blueprint + blueprint_definition: str = "" # get_flow flow: str = "" diff --git a/trustgraph-cli/pyproject.toml b/trustgraph-cli/pyproject.toml index 65921d92..5568bf91 100644 --- a/trustgraph-cli/pyproject.toml +++ b/trustgraph-cli/pyproject.toml @@ -29,13 +29,13 @@ Homepage = "https://github.com/trustgraph-ai/trustgraph" [project.scripts] tg-add-library-document = "trustgraph.cli.add_library_document:main" -tg-delete-flow-class = "trustgraph.cli.delete_flow_class:main" +tg-delete-flow-blueprint = "trustgraph.cli.delete_flow_blueprint:main" tg-delete-mcp-tool = "trustgraph.cli.delete_mcp_tool:main" tg-delete-kg-core = "trustgraph.cli.delete_kg_core:main" tg-delete-tool = "trustgraph.cli.delete_tool:main" tg-dump-msgpack = "trustgraph.cli.dump_msgpack:main" tg-dump-queues = "trustgraph.cli.dump_queues:main" -tg-get-flow-class = "trustgraph.cli.get_flow_class:main" +tg-get-flow-blueprint = "trustgraph.cli.get_flow_blueprint:main" tg-get-kg-core = "trustgraph.cli.get_kg_core:main" tg-graph-to-turtle = "trustgraph.cli.graph_to_turtle:main" tg-init-trustgraph = "trustgraph.cli.init_trustgraph:main" @@ -56,7 +56,7 @@ tg-load-text = "trustgraph.cli.load_text:main" tg-load-turtle = "trustgraph.cli.load_turtle:main" tg-load-knowledge = "trustgraph.cli.load_knowledge:main" tg-load-structured-data = "trustgraph.cli.load_structured_data:main" -tg-put-flow-class = "trustgraph.cli.put_flow_class:main" +tg-put-flow-blueprint = "trustgraph.cli.put_flow_blueprint:main" tg-put-kg-core = "trustgraph.cli.put_kg_core:main" tg-remove-library-document = "trustgraph.cli.remove_library_document:main" tg-save-doc-embeds = "trustgraph.cli.save_doc_embeds:main" @@ -65,7 +65,7 @@ tg-set-prompt = "trustgraph.cli.set_prompt:main" tg-set-token-costs = "trustgraph.cli.set_token_costs:main" tg-set-tool = "trustgraph.cli.set_tool:main" tg-show-config = "trustgraph.cli.show_config:main" -tg-show-flow-classes = "trustgraph.cli.show_flow_classes:main" +tg-show-flow-blueprints = "trustgraph.cli.show_flow_blueprints:main" tg-show-flow-state = "trustgraph.cli.show_flow_state:main" tg-show-flows = "trustgraph.cli.show_flows:main" tg-show-graph = "trustgraph.cli.show_graph:main" diff --git a/trustgraph-cli/trustgraph/cli/delete_flow_class.py b/trustgraph-cli/trustgraph/cli/delete_flow_blueprint.py similarity index 65% rename from trustgraph-cli/trustgraph/cli/delete_flow_class.py rename to trustgraph-cli/trustgraph/cli/delete_flow_blueprint.py index ba0a5a9c..9ff8aeba 100644 --- a/trustgraph-cli/trustgraph/cli/delete_flow_class.py +++ b/trustgraph-cli/trustgraph/cli/delete_flow_blueprint.py @@ -1,5 +1,5 @@ """ -Deletes a flow class +Deletes a flow blueprint """ import argparse @@ -10,16 +10,16 @@ import json default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') -def delete_flow_class(url, class_name): +def delete_flow_blueprint(url, blueprint_name): api = Api(url).flow() - class_names = api.delete_class(class_name) + blueprint_names = api.delete_blueprint(blueprint_name) def main(): parser = argparse.ArgumentParser( - prog='tg-delete-flow-class', + prog='tg-delete-flow-blueprint', description=__doc__, ) @@ -30,17 +30,17 @@ def main(): ) parser.add_argument( - '-n', '--class-name', - help=f'Flow class name', + '-n', '--blueprint-name', + help=f'Flow blueprint name', ) args = parser.parse_args() try: - delete_flow_class( + delete_flow_blueprint( url=args.api_url, - class_name=args.class_name, + blueprint_name=args.blueprint_name, ) except Exception as e: @@ -48,4 +48,4 @@ def main(): print("Exception:", e, flush=True) if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/trustgraph-cli/trustgraph/cli/get_flow_class.py b/trustgraph-cli/trustgraph/cli/get_flow_blueprint.py similarity index 67% rename from trustgraph-cli/trustgraph/cli/get_flow_class.py rename to trustgraph-cli/trustgraph/cli/get_flow_blueprint.py index 5479e507..817b8f47 100644 --- a/trustgraph-cli/trustgraph/cli/get_flow_class.py +++ b/trustgraph-cli/trustgraph/cli/get_flow_blueprint.py @@ -1,5 +1,5 @@ """ -Outputs a flow class definition in JSON format. +Outputs a flow blueprint definition in JSON format. """ import argparse @@ -10,18 +10,18 @@ import json default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') -def get_flow_class(url, class_name): +def get_flow_blueprint(url, blueprint_name): api = Api(url).flow() - cls = api.get_class(class_name) + cls = api.get_blueprint(blueprint_name) print(json.dumps(cls, indent=4)) def main(): parser = argparse.ArgumentParser( - prog='tg-get-flow-class', + prog='tg-get-flow-blueprint', description=__doc__, ) @@ -32,18 +32,18 @@ def main(): ) parser.add_argument( - '-n', '--class-name', + '-n', '--blueprint-name', required=True, - help=f'Flow class name', + help=f'Flow blueprint name', ) args = parser.parse_args() try: - get_flow_class( + get_flow_blueprint( url=args.api_url, - class_name=args.class_name, + blueprint_name=args.blueprint_name, ) except Exception as e: @@ -51,4 +51,4 @@ def main(): print("Exception:", e, flush=True) if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/trustgraph-cli/trustgraph/cli/put_flow_class.py b/trustgraph-cli/trustgraph/cli/put_flow_blueprint.py similarity index 69% rename from trustgraph-cli/trustgraph/cli/put_flow_class.py rename to trustgraph-cli/trustgraph/cli/put_flow_blueprint.py index 6a88421d..740a224a 100644 --- a/trustgraph-cli/trustgraph/cli/put_flow_class.py +++ b/trustgraph-cli/trustgraph/cli/put_flow_blueprint.py @@ -1,6 +1,6 @@ """ -Uploads a flow class definition. You can take the output of -tg-get-flow-class and load it back in using this utility. +Uploads a flow blueprint definition. You can take the output of +tg-get-flow-blueprint and load it back in using this utility. """ import argparse @@ -11,16 +11,16 @@ import json default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') default_token = os.getenv("TRUSTGRAPH_TOKEN", None) -def put_flow_class(url, class_name, config, token=None): +def put_flow_blueprint(url, blueprint_name, config, token=None): api = Api(url, token=token) - class_names = api.flow().put_class(class_name, config) + blueprint_names = api.flow().put_blueprint(blueprint_name, config) def main(): parser = argparse.ArgumentParser( - prog='tg-put-flow-class', + prog='tg-put-flow-blueprint', description=__doc__, ) @@ -37,8 +37,8 @@ def main(): ) parser.add_argument( - '-n', '--class-name', - help=f'Flow class name', + '-n', '--blueprint-name', + help=f'Flow blueprint name', ) parser.add_argument( @@ -50,9 +50,9 @@ def main(): try: - put_flow_class( + put_flow_blueprint( url=args.api_url, - class_name=args.class_name, + blueprint_name=args.blueprint_name, config=json.loads(args.config), token=args.token, ) @@ -62,4 +62,4 @@ def main(): print("Exception:", e, flush=True) if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/trustgraph-cli/trustgraph/cli/show_flow_classes.py b/trustgraph-cli/trustgraph/cli/show_flow_blueprints.py similarity index 85% rename from trustgraph-cli/trustgraph/cli/show_flow_classes.py rename to trustgraph-cli/trustgraph/cli/show_flow_blueprints.py index 123f5380..8009cbad 100644 --- a/trustgraph-cli/trustgraph/cli/show_flow_classes.py +++ b/trustgraph-cli/trustgraph/cli/show_flow_blueprints.py @@ -1,5 +1,5 @@ """ -Shows all defined flow classes. +Shows all defined flow blueprints. """ import argparse @@ -16,7 +16,7 @@ def format_parameters(params_metadata, config_api): Format parameter metadata for display Args: - params_metadata: Parameter definitions from flow class + params_metadata: Parameter definitions from flow blueprint config_api: API client to get parameter type information Returns: @@ -58,23 +58,23 @@ def format_parameters(params_metadata, config_api): return "\n".join(param_list) -def show_flow_classes(url, token=None): +def show_flow_blueprints(url, token=None): api = Api(url, token=token) flow_api = api.flow() config_api = api.config() - class_names = flow_api.list_classes() + blueprint_names = flow_api.list_blueprints() - if len(class_names) == 0: - print("No flow classes.") + if len(blueprint_names) == 0: + print("No flow blueprints.") return - for class_name in class_names: - cls = flow_api.get_class(class_name) + for blueprint_name in blueprint_names: + cls = flow_api.get_blueprint(blueprint_name) table = [] - table.append(("name", class_name)) + table.append(("name", blueprint_name)) table.append(("description", cls.get("description", ""))) tags = cls.get("tags", []) @@ -97,7 +97,7 @@ def show_flow_classes(url, token=None): def main(): parser = argparse.ArgumentParser( - prog='tg-show-flow-classes', + prog='tg-show-flow-blueprints', description=__doc__, ) @@ -117,7 +117,7 @@ def main(): try: - show_flow_classes( + show_flow_blueprints( url=args.api_url, token=args.token, ) @@ -127,4 +127,4 @@ def main(): print("Exception:", e, flush=True) if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/trustgraph-flow/trustgraph/agent/mcp_tool/service.py b/trustgraph-flow/trustgraph/agent/mcp_tool/service.py index 3858d06b..23789b96 100755 --- a/trustgraph-flow/trustgraph/agent/mcp_tool/service.py +++ b/trustgraph-flow/trustgraph/agent/mcp_tool/service.py @@ -32,7 +32,9 @@ class Service(ToolService): logger.info(f"Got config version {version}") - if "mcp" not in config: return + if "mcp" not in config: + self.mcp_services = {} + return self.mcp_services = { k: json.loads(v) diff --git a/trustgraph-flow/trustgraph/config/service/flow.py b/trustgraph-flow/trustgraph/config/service/flow.py index 42696c31..2163d107 100644 --- a/trustgraph-flow/trustgraph/config/service/flow.py +++ b/trustgraph-flow/trustgraph/config/service/flow.py @@ -13,26 +13,26 @@ class FlowConfig: # Cache for parameter type definitions to avoid repeated lookups self.param_type_cache = {} - async def resolve_parameters(self, flow_class, user_params): + async def resolve_parameters(self, flow_blueprint, user_params): """ Resolve parameters by merging user-provided values with defaults. Args: - flow_class: The flow class definition dict + flow_blueprint: The flow blueprint definition dict user_params: User-provided parameters dict (may be None or empty) Returns: Complete parameter dict with user values and defaults merged (all values as strings) """ - # If the flow class has no parameters section, return user params as-is (stringified) - if "parameters" not in flow_class: + # If the flow blueprint has no parameters section, return user params as-is (stringified) + if "parameters" not in flow_blueprint: if not user_params: return {} # Ensure all values are strings return {k: str(v) for k, v in user_params.items()} resolved = {} - flow_params = flow_class["parameters"] + flow_params = flow_blueprint["parameters"] user_params = user_params if user_params else {} # First pass: resolve parameters with explicit values or defaults @@ -92,7 +92,7 @@ class FlowConfig: else: resolved[param_name] = str(default_value) - # Include any extra parameters from user that weren't in flow class definition + # Include any extra parameters from user that weren't in flow blueprint definition # This allows for forward compatibility (ensure they're strings) for key, value in user_params.items(): if key not in resolved: @@ -100,28 +100,28 @@ class FlowConfig: return resolved - async def handle_list_classes(self, msg): + async def handle_list_blueprints(self, msg): - names = list(await self.config.get("flow-classes").keys()) + names = list(await self.config.get("flow-blueprints").keys()) return FlowResponse( error = None, - class_names = names, + blueprint_names = names, ) - async def handle_get_class(self, msg): + async def handle_get_blueprint(self, msg): return FlowResponse( error = None, - class_definition = await self.config.get( - "flow-classes" - ).get(msg.class_name), + blueprint_definition = await self.config.get( + "flow-blueprints" + ).get(msg.blueprint_name), ) - async def handle_put_class(self, msg): + async def handle_put_blueprint(self, msg): - await self.config.get("flow-classes").put( - msg.class_name, msg.class_definition + await self.config.get("flow-blueprints").put( + msg.blueprint_name, msg.blueprint_definition ) await self.config.inc_version() @@ -132,11 +132,11 @@ class FlowConfig: error = None, ) - async def handle_delete_class(self, msg): + async def handle_delete_blueprint(self, msg): logger.debug(f"Flow config message: {msg}") - await self.config.get("flow-classes").delete(msg.class_name) + await self.config.get("flow-blueprints").delete(msg.blueprint_name) await self.config.inc_version() @@ -169,8 +169,8 @@ class FlowConfig: async def handle_start_flow(self, msg): - if msg.class_name is None: - raise RuntimeError("No class name") + if msg.blueprint_name is None: + raise RuntimeError("No blueprint name") if msg.flow_id is None: raise RuntimeError("No flow ID") @@ -181,11 +181,11 @@ class FlowConfig: if msg.description is None: raise RuntimeError("No description") - if msg.class_name not in await self.config.get("flow-classes").keys(): - raise RuntimeError("Class does not exist") + if msg.blueprint_name not in await self.config.get("flow-blueprints").keys(): + raise RuntimeError("Blueprint does not exist") cls = json.loads( - await self.config.get("flow-classes").get(msg.class_name) + await self.config.get("flow-blueprints").get(msg.blueprint_name) ) # Resolve parameters by merging user-provided values with defaults @@ -200,7 +200,7 @@ class FlowConfig: def repl_template_with_params(tmp): result = tmp.replace( - "{class}", msg.class_name + "{blueprint}", msg.blueprint_name ).replace( "{id}", msg.flow_id ) @@ -210,7 +210,7 @@ class FlowConfig: return result - for kind in ("class", "flow"): + for kind in ("blueprint", "flow"): for k, v in cls[kind].items(): @@ -223,7 +223,7 @@ class FlowConfig: for k2, v2 in v.items() } - flac = await self.config.get("flows-active").get(processor) + flac = await self.config.get("active-flow").get(processor) if flac is not None: target = json.loads(flac) else: @@ -237,7 +237,7 @@ class FlowConfig: if variant not in target: target[variant] = v - await self.config.get("flows-active").put( + await self.config.get("active-flow").put( processor, json.dumps(target) ) @@ -262,7 +262,7 @@ class FlowConfig: msg.flow_id, json.dumps({ "description": msg.description, - "class-name": msg.class_name, + "blueprint-name": msg.blueprint_name, "interfaces": interfaces, "parameters": parameters, }) @@ -286,17 +286,17 @@ class FlowConfig: flow = json.loads(await self.config.get("flows").get(msg.flow_id)) - if "class-name" not in flow: - raise RuntimeError("Internal error: flow has no flow class") + if "blueprint-name" not in flow: + raise RuntimeError("Internal error: flow has no flow blueprint") - class_name = flow["class-name"] + blueprint_name = flow["blueprint-name"] parameters = flow.get("parameters", {}) - cls = json.loads(await self.config.get("flow-classes").get(class_name)) + cls = json.loads(await self.config.get("flow-blueprints").get(blueprint_name)) def repl_template(tmp): result = tmp.replace( - "{class}", class_name + "{blueprint}", blueprint_name ).replace( "{id}", msg.flow_id ) @@ -313,7 +313,7 @@ class FlowConfig: variant = repl_template(variant) - flac = await self.config.get("flows-active").get(processor) + flac = await self.config.get("active-flow").get(processor) if flac is not None: target = json.loads(flac) @@ -323,7 +323,7 @@ class FlowConfig: if variant in target: del target[variant] - await self.config.get("flows-active").put( + await self.config.get("active-flow").put( processor, json.dumps(target) ) @@ -342,14 +342,14 @@ class FlowConfig: logger.debug(f"Handling flow message: {msg.operation}") - if msg.operation == "list-classes": - resp = await self.handle_list_classes(msg) - elif msg.operation == "get-class": - resp = await self.handle_get_class(msg) - elif msg.operation == "put-class": - resp = await self.handle_put_class(msg) - elif msg.operation == "delete-class": - resp = await self.handle_delete_class(msg) + if msg.operation == "list-blueprints": + resp = await self.handle_list_blueprints(msg) + elif msg.operation == "get-blueprint": + resp = await self.handle_get_blueprint(msg) + elif msg.operation == "put-blueprint": + resp = await self.handle_put_blueprint(msg) + elif msg.operation == "delete-blueprint": + resp = await self.handle_delete_blueprint(msg) elif msg.operation == "list-flows": resp = await self.handle_list_flows(msg) elif msg.operation == "get-flow": diff --git a/trustgraph-flow/trustgraph/metering/counter.py b/trustgraph-flow/trustgraph/metering/counter.py index 07dea8ba..7851232a 100644 --- a/trustgraph-flow/trustgraph/metering/counter.py +++ b/trustgraph-flow/trustgraph/metering/counter.py @@ -52,7 +52,7 @@ class Processor(FlowProcessor): self.prices = {} - self.config_key = "token-costs" + self.config_key = "token-cost" # Load token costs from the config service async def on_cost_config(self, config, version):