Flow class -> flow blueprint

This commit is contained in:
Cyber MacGeddon 2026-01-14 11:35:53 +00:00
parent 7672815349
commit ba45b59f7b
4 changed files with 38 additions and 38 deletions

View file

@ -1,5 +1,5 @@
"""
Starts a processing flow using a defined flow class.
Starts a processing flow using a defined flow blueprint.
Parameters can be provided in three ways:
1. As key=value pairs: --param model=gpt-4 --param temp=0.7
@ -19,12 +19,12 @@ import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_token = os.getenv("TRUSTGRAPH_TOKEN", None)
def start_flow(url, class_name, flow_id, description, parameters=None, token=None):
def start_flow(url, blueprint_name, flow_id, description, parameters=None, token=None):
api = Api(url, token=token).flow()
api.start(
class_name = class_name,
blueprint_name = blueprint_name,
id = flow_id,
description = description,
parameters = parameters,
@ -50,9 +50,9 @@ def main():
)
parser.add_argument(
'-n', '--class-name',
'-n', '--blueprint-name',
required=True,
help=f'Flow class name',
help=f'Flow blueprint name',
)
parser.add_argument(
@ -115,7 +115,7 @@ def main():
start_flow(
url = args.api_url,
class_name = args.class_name,
blueprint_name = args.blueprint_name,
flow_id = args.flow_id,
description = args.description,
parameters = parameters,

View file

@ -194,21 +194,21 @@ def check_processors(url: str, min_processors: int, timeout: int, token: Optiona
return False, f"Processor check error: {e}"
def check_flow_classes(url: str, timeout: int, token: Optional[str] = None) -> Tuple[bool, str]:
"""Check if flow classes are loaded."""
def check_flow_blueprints(url: str, timeout: int, token: Optional[str] = None) -> Tuple[bool, str]:
"""Check if flow blueprints are loaded."""
try:
api = Api(url, token=token, timeout=timeout)
flow_api = api.flow()
classes = flow_api.list_classes()
blueprints = flow_api.list_blueprints()
if classes and len(classes) > 0:
return True, f"Found {len(classes)} flow class(es)"
if blueprints and len(blueprints) > 0:
return True, f"Found {len(blueprints)} flow blueprint(s)"
else:
return False, "No flow classes found"
return False, "No flow blueprints found"
except Exception as e:
return False, f"Flow classes check error: {e}"
return False, f"Flow blueprints check error: {e}"
def check_flows(url: str, timeout: int, token: Optional[str] = None) -> Tuple[bool, str]:
@ -416,8 +416,8 @@ def main():
)
checker.run_check(
"Flow Classes",
check_flow_classes,
"Flow Blueprints",
check_flow_blueprints,
args.api_url,
args.check_timeout,
args.token