diff --git a/docs/tech-specs/flow-class-definition.md b/docs/tech-specs/flow-class-definition.md index 5469144e..c6f0c7b0 100644 --- a/docs/tech-specs/flow-class-definition.md +++ b/docs/tech-specs/flow-class-definition.md @@ -6,7 +6,7 @@ A flow class defines a complete dataflow pattern template in the TrustGraph syst ## Structure -A flow class definition consists of four main sections: +A flow class definition consists of five main sections: ### 1. Class Section Defines shared service processors that are instantiated once per flow class. These processors handle requests from all flow instances of this class. @@ -15,7 +15,11 @@ Defines shared service processors that are instantiated once per flow class. The "class": { "service-name:{class}": { "request": "queue-pattern:{class}", - "response": "queue-pattern:{class}" + "response": "queue-pattern:{class}", + "settings": { + "setting-name": "fixed-value", + "parameterized-setting": "{parameter-name}" + } } } ``` @@ -24,6 +28,7 @@ Defines shared service processors that are instantiated once per flow class. The - Shared across all flow instances of the same class - Typically expensive or stateless services (LLMs, embedding models) - Use `{class}` template variable for queue naming +- Settings can be fixed values or parameterized with `{parameter-name}` syntax - Examples: `embeddings:{class}`, `text-completion:{class}`, `graph-rag:{class}` ### 2. Flow Section @@ -33,7 +38,11 @@ Defines flow-specific processors that are instantiated for each individual flow "flow": { "processor-name:{id}": { "input": "queue-pattern:{id}", - "output": "queue-pattern:{id}" + "output": "queue-pattern:{id}", + "settings": { + "setting-name": "fixed-value", + "parameterized-setting": "{parameter-name}" + } } } ``` @@ -42,6 +51,7 @@ Defines flow-specific processors that are instantiated for each individual flow - Unique instance per flow - Handle flow-specific data and state - Use `{id}` template variable for queue naming +- Settings can be fixed values or parameterized with `{parameter-name}` syntax - Examples: `chunker:{id}`, `pdf-decoder:{id}`, `kg-extract-relationships:{id}` ### 3. Interfaces Section @@ -72,7 +82,24 @@ Interfaces can take two forms: - **Service Interfaces**: Request/response patterns for services (`embeddings`, `text-completion`) - **Data Interfaces**: Fire-and-forget data flow connection points (`triples-store`, `entity-contexts-load`) -### 4. Metadata +### 4. Parameters Section +Maps flow-specific parameter names to centrally-stored parameter definitions: + +```json +"parameters": { + "model": "llm-model", + "temp": "temperature", + "chunk": "chunk-size" +} +``` + +**Characteristics:** +- Keys are parameter names used in processor settings (e.g., `{model}`) +- Values reference parameter definitions stored in schema/config +- Enables reuse of common parameter definitions across flows +- Reduces duplication of parameter schemas + +### 5. Metadata Additional information about the flow class: ```json @@ -82,16 +109,98 @@ Additional information about the flow class: ## Template Variables -### {id} +### System Variables + +#### {id} - Replaced with the unique flow instance identifier - Creates isolated resources for each flow - Example: `flow-123`, `customer-A-flow` -### {class} +#### {class} - Replaced with the flow class name - Creates shared resources across flows of the same class - Example: `standard-rag`, `enterprise-rag` +### Parameter Variables + +#### {parameter-name} +- Custom parameters defined at flow launch time +- Parameter names match keys in the flow's `parameters` section +- Used in processor settings to customize behavior +- Examples: `{model}`, `{temp}`, `{chunk}` +- Replaced with values provided when launching the flow +- Validated against centrally-stored parameter definitions + +## Processor Settings + +Settings provide configuration values to processors at instantiation time. They can be: + +### Fixed Settings +Direct values that don't change: +```json +"settings": { + "model": "gemma3:12b", + "temperature": 0.7, + "max_retries": 3 +} +``` + +### Parameterized Settings +Values that use parameters provided at flow launch: +```json +"settings": { + "model": "{model}", + "temperature": "{temp}", + "endpoint": "https://{region}.api.example.com" +} +``` + +Parameter names in settings correspond to keys in the flow's `parameters` section. + +### Settings Examples + +**LLM Processor with Parameters:** +```json +// In parameters section: +"parameters": { + "model": "llm-model", + "temp": "temperature", + "tokens": "max-tokens", + "key": "openai-api-key" +} + +// In processor definition: +"text-completion:{class}": { + "request": "non-persistent://tg/request/text-completion:{class}", + "response": "non-persistent://tg/response/text-completion:{class}", + "settings": { + "model": "{model}", + "temperature": "{temp}", + "max_tokens": "{tokens}", + "api_key": "{key}" + } +} +``` + +**Chunker with Fixed and Parameterized Settings:** +```json +// In parameters section: +"parameters": { + "chunk": "chunk-size" +} + +// In processor definition: +"chunker:{id}": { + "input": "persistent://tg/flow/chunk:{id}", + "output": "persistent://tg/flow/chunk-load:{id}", + "settings": { + "chunk_size": "{chunk}", + "chunk_overlap": 100, + "encoding": "utf-8" + } +} +``` + ## Queue Patterns (Pulsar) Flow classes use Apache Pulsar for messaging. Queue names follow the Pulsar format: @@ -137,15 +246,27 @@ All processors (both `{id}` and `{class}`) work together as a cohesive dataflow Given: - Flow Instance ID: `customer-A-flow` - Flow Class: `standard-rag` +- Flow parameter mappings: + - `"model": "llm-model"` + - `"temp": "temperature"` + - `"chunk": "chunk-size"` +- User-provided parameters: + - `model`: `gpt-4` + - `temp`: `0.5` + - `chunk`: `512` Template expansions: - `persistent://tg/flow/chunk-load:{id}` → `persistent://tg/flow/chunk-load:customer-A-flow` - `non-persistent://tg/request/embeddings:{class}` → `non-persistent://tg/request/embeddings:standard-rag` +- `"model": "{model}"` → `"model": "gpt-4"` +- `"temperature": "{temp}"` → `"temperature": "0.5"` +- `"chunk_size": "{chunk}"` → `"chunk_size": "512"` This creates: - Isolated document processing pipeline for `customer-A-flow` - Shared embedding service for all `standard-rag` flows - Complete dataflow from document ingestion through querying +- Processors configured with the provided parameter values ## Benefits diff --git a/docs/tech-specs/flow-configurable-parameters.md b/docs/tech-specs/flow-configurable-parameters.md new file mode 100644 index 00000000..1f48b853 --- /dev/null +++ b/docs/tech-specs/flow-configurable-parameters.md @@ -0,0 +1,333 @@ +# Flow Class Configurable Parameters Technical Specification + +## Overview + +This specification describes the implementation of configurable parameters for flow classes in TrustGraph. Parameters enable users to customize processor settings at flow launch time by providing values that replace parameter placeholders in the flow class definition. + +Parameters work through template variable substitution in processor settings, similar to how `{id}` and `{class}` variables work, but with user-provided values. + +The integration supports four primary use cases: + +1. **Model Selection**: Allowing users to choose different LLM models (e.g., `gemma3:8b`, `gpt-4`, `claude-3`) for processors +2. **Resource Configuration**: Adjusting processor settings like chunk sizes, batch sizes, and concurrency limits +3. **Behavioral Tuning**: Modifying processor behavior through parameters like temperature, max-tokens, or retrieval thresholds +4. **Environment-Specific Settings**: Configuring endpoints, API keys, or region-specific URLs per deployment + +## Goals + +- **Dynamic Processor Configuration**: Enable runtime configuration of processor settings through parameter substitution +- **Parameter Validation**: Provide type checking and validation for parameters at flow launch time +- **Default Values**: Support sensible defaults while allowing overrides for advanced users +- **Template Substitution**: Seamlessly replace parameter placeholders in processor settings +- **UI Integration**: Enable parameter input through both API and UI interfaces +- **Type Safety**: Ensure parameter types match expected processor setting types +- **Documentation**: Self-documenting parameter schemas within flow class definitions +- **Backward Compatibility**: Maintain compatibility with existing flow classes that don't use parameters + +## Background + +Flow classes in TrustGraph now support processor settings that can contain either fixed values or parameter placeholders. This creates an opportunity for runtime customization. + +Current processor settings support: +- Fixed values: `"model": "gemma3:12b"` +- Parameter placeholders: `"model": "gemma3:{model-size}"` + +This specification defines how parameters are: +- Declared in flow class definitions +- Validated when flows are launched +- Substituted in processor settings +- Exposed through APIs and UI + +By leveraging parameterized processor settings, TrustGraph can: +- Reduce flow class duplication by using parameters for variations +- Enable users to tune processor behavior without modifying definitions +- Support environment-specific configurations through parameter values +- Maintain type safety through parameter schema validation + +## Technical Design + +### Architecture + +The configurable parameters system requires the following technical components: + +1. **Parameter Schema Definition** + - JSON Schema-based parameter definitions within flow class metadata + - Type definitions including string, number, boolean, enum, and object types + - Validation rules including min/max values, patterns, and required fields + + Module: trustgraph-flow/trustgraph/flow/definition.py + +2. **Parameter Resolution Engine** + - Runtime parameter validation against schema + - Default value application for unspecified parameters + - Parameter injection into flow execution context + - Type coercion and conversion as needed + + Module: trustgraph-flow/trustgraph/flow/parameter_resolver.py + +3. **Parameter Store Integration** + - Retrieval of parameter definitions from schema/config store + - Caching of frequently-used parameter definitions + - Validation against centrally-stored schemas + + Module: trustgraph-flow/trustgraph/flow/parameter_store.py + +4. **Flow Launcher Extensions** + - API extensions to accept parameter values during flow launch + - Parameter mapping resolution (flow names to definition names) + - Error handling for invalid parameter combinations + + Module: trustgraph-flow/trustgraph/flow/launcher.py + +5. **UI Parameter Forms** + - Dynamic form generation from parameter schemas + - Input validation and error display + - Parameter presets and templates + + Module: trustgraph-ui/components/flow-parameters/ + +### Data Models + +#### Parameter Definitions (Stored in Schema/Config) + +Parameter definitions are stored centrally in the schema and config system with type "parameters": + +```json +{ + "llm-model": { + "type": "string", + "description": "LLM model to use", + "default": "gpt-4", + "enum": ["gpt-4", "gpt-3.5-turbo", "claude-3", "gemma3:8b"], + "required": false + }, + "model-size": { + "type": "string", + "description": "Model size variant", + "default": "8b", + "enum": ["2b", "8b", "12b", "70b"], + "required": false + }, + "temperature": { + "type": "number", + "description": "Model temperature for generation", + "default": 0.7, + "minimum": 0.0, + "maximum": 2.0, + "required": false + }, + "chunk-size": { + "type": "integer", + "description": "Document chunk size", + "default": 512, + "minimum": 128, + "maximum": 2048, + "required": false + } +} +``` + +#### Flow Class with Parameter References + +Flow classes reference parameter definitions by name: + +```json +{ + "flow_class": "document-analysis", + "parameters": { + "model": "llm-model", + "size": "model-size", + "temp": "temperature", + "chunk": "chunk-size" + }, + "class": { + "text-completion:{class}": { + "request": "non-persistent://tg/request/text-completion:{class}", + "response": "non-persistent://tg/response/text-completion:{class}", + "settings": { + "model": "{model}", + "temperature": "{temp}" + } + } + }, + "flow": { + "chunker:{id}": { + "input": "persistent://tg/flow/chunk:{id}", + "output": "persistent://tg/flow/chunk-load:{id}", + "settings": { + "chunk_size": "{chunk}", + "chunk_overlap": 100 + } + } + } +} +``` + +The `parameters` section maps flow-specific parameter names (keys) to centrally-defined parameter definitions (values). +``` + +This approach allows: +- Reusable parameter definitions across multiple flow classes +- Centralized parameter management and updates +- Reduced duplication in flow class definitions +- Consistent parameter validation across flows +- Easy addition of new standard parameters + +#### Flow Launch Request + +The flow launch API accepts parameters using the flow's parameter names: + +```json +{ + "flow_class": "document-analysis", + "flow_id": "customer-A-flow", + "parameters": { + "model": "claude-3", + "size": "12b", + "temp": 0.5, + "chunk": 1024 + } +} +``` + +The system will: +1. Map flow parameter names to their definitions (e.g., `model` → `llm-model`) +2. Validate values against the parameter definitions +3. Substitute values into processor settings during flow instantiation + +### Implementation Details + +#### Parameter Resolution Process + +1. **Flow Class Loading**: Load flow class and extract parameter references +2. **Definition Lookup**: Retrieve parameter definitions from schema/config store +3. **Validation**: Validate user-provided parameters against definitions +4. **Default Application**: Apply default values for missing parameters +5. **Template Substitution**: Replace parameter placeholders in processor settings +6. **Processor Instantiation**: Create processors with substituted settings + +#### Pulsar Integration + +1. **Start-Flow Operation** + - The Pulsar start-flow operation needs to accept a `parameters` field containing a map of parameter values + - The Pulsar schema for the start-flow request must be updated to include the optional `parameters` field + - Example request: + ```json + { + "flow_class": "document-analysis", + "flow_id": "customer-A-flow", + "parameters": { + "model": "claude-3", + "size": "12b", + "temp": 0.5, + "chunk": 1024 + } + } + ``` + +2. **Get-Flow Operation** + - The Pulsar schema for the get-flow response must be updated to include the `parameters` field + - This allows clients to retrieve the parameter values that were used when the flow was started + - Example response: + ```json + { + "flow_id": "customer-A-flow", + "flow_class": "document-analysis", + "status": "running", + "parameters": { + "model": "claude-3", + "size": "12b", + "temp": 0.5, + "chunk": 1024 + } + } + ``` + +#### Config System Integration + +3. **Flow Object Storage** + - When a flow is added to the config system by the flow component in the config manager, the flow object must include the resolved parameter values + - The config manager needs to store both the original user-provided parameters and the resolved values (with defaults applied) + - Flow objects in the config system should include: + - `parameters`: The final resolved parameter values used for the flow + - `parameter_definitions`: Reference to the parameter schema used for validation + +#### CLI Integration + +4. **Library CLI Commands** + - CLI commands that start flows need parameter support: + - Accept parameter values via command-line flags or configuration files + - Validate parameters against flow class definitions before submission + - Support parameter file input (JSON/YAML) for complex parameter sets + + - CLI commands that show flows need to display parameter information: + - Show parameter values used when the flow was started + - Display available parameters for a flow class + - Show parameter validation schemas and defaults + +#### Processor Base Class Integration + +5. **SettingsSpec Support** + - Processor base classes need to support parameter substitution through the existing SettingsSpec mechanism + - The SettingsSpec class (located in the same module as ConsumerSpec and ProducerSpec) should be enhanced if necessary to support parameter template substitution + - Processors should be able to invoke SettingsSpec to configure their settings with parameter values resolved at flow launch time + - The SettingsSpec implementation needs to: + - Accept settings configurations that contain parameter placeholders (e.g., `{model}`, `{temperature}`) + - Support runtime parameter substitution when the processor is instantiated + - Validate that substituted values match expected types and constraints + - Provide error handling for missing or invalid parameter references + +#### Substitution Rules + +- Parameters use the format `{parameter-name}` in processor settings +- Parameter names in settings match the keys in the flow's `parameters` section +- Substitution occurs alongside `{id}` and `{class}` replacement +- Invalid parameter references result in launch-time errors +- Type validation happens based on the centrally-stored parameter definition + +Example resolution: +``` +Flow parameter mapping: "model": "llm-model" +Processor setting: "model": "{model}" +User provides: "model": "gemma3:8b" +Final setting: "model": "gemma3:8b" +``` + +## Testing Strategy + +- Unit tests for parameter schema validation +- Integration tests for parameter substitution in processor settings +- End-to-end tests for launching flows with different parameter values +- UI tests for parameter form generation and validation +- Performance tests for flows with many parameters +- Edge cases: missing parameters, invalid types, undefined parameter references + +## Migration Plan + +1. The system should continue to support flow classes with no parameters + declared. +2. The system should continue to support flows no parameters specified: + This works for flows with no parameters, and flows with parameters + (they have defaults). + +## Open Questions + +Q: Should parameters support complex nested objects or keep to simple types? +A: The parameter values will be string encoded, we're probably going to want + to stick to strings. + +Q: Should parameter placeholders be allowed in queue names or only in + settings? +A: Only in settings to remove strange injections and edge-cases. + +Q: How to handle conflicts between parameter names and system variables like + `id` and `class`? +A: It is not valid to specify id and class when launching a flow + +Q: Should we support computed parameters (derived from other parameters)? +A: Just string substitution to remove strange injections and edge-cases. + +## References + +- JSON Schema Specification: https://json-schema.org/ +- Flow Class Definition Spec: docs/tech-specs/flow-class-definition.md diff --git a/trustgraph-base/trustgraph/api/flow.py b/trustgraph-base/trustgraph/api/flow.py index d1d5f95e..0214a4bd 100644 --- a/trustgraph-base/trustgraph/api/flow.py +++ b/trustgraph-base/trustgraph/api/flow.py @@ -87,7 +87,7 @@ class Flow: return json.loads(self.request(request = input)["flow"]) - def start(self, class_name, id, description): + def start(self, class_name, id, description, parameters=None): # The input consists of system and prompt strings input = { @@ -97,6 +97,9 @@ class Flow: "description": description, } + if parameters: + input["parameters"] = parameters + self.request(request = input) def stop(self, id): diff --git a/trustgraph-base/trustgraph/messaging/translators/flow.py b/trustgraph-base/trustgraph/messaging/translators/flow.py index f05767c8..8c1a019a 100644 --- a/trustgraph-base/trustgraph/messaging/translators/flow.py +++ b/trustgraph-base/trustgraph/messaging/translators/flow.py @@ -12,12 +12,13 @@ class FlowRequestTranslator(MessageTranslator): class_name=data.get("class-name"), class_definition=data.get("class-definition"), description=data.get("description"), - flow_id=data.get("flow-id") + flow_id=data.get("flow-id"), + parameters=data.get("parameters") ) def from_pulsar(self, obj: FlowRequest) -> Dict[str, Any]: result = {} - + if obj.operation is not None: result["operation"] = obj.operation if obj.class_name is not None: @@ -28,7 +29,9 @@ class FlowRequestTranslator(MessageTranslator): result["description"] = obj.description if obj.flow_id is not None: result["flow-id"] = obj.flow_id - + if obj.parameters is not None: + result["parameters"] = obj.parameters + return result @@ -40,7 +43,7 @@ class FlowResponseTranslator(MessageTranslator): def from_pulsar(self, obj: FlowResponse) -> Dict[str, Any]: result = {} - + if obj.class_names is not None: result["class-names"] = obj.class_names if obj.flow_ids is not None: @@ -51,7 +54,9 @@ class FlowResponseTranslator(MessageTranslator): result["flow"] = obj.flow if obj.description is not None: result["description"] = obj.description - + if obj.parameters is not None: + result["parameters"] = obj.parameters + return result def from_response_with_completion(self, obj: FlowResponse) -> Tuple[Dict[str, Any], bool]: diff --git a/trustgraph-base/trustgraph/schema/services/flow.py b/trustgraph-base/trustgraph/schema/services/flow.py index 0b5c1bfd..d03e559b 100644 --- a/trustgraph-base/trustgraph/schema/services/flow.py +++ b/trustgraph-base/trustgraph/schema/services/flow.py @@ -35,6 +35,9 @@ class FlowRequest(Record): # get_flow, start_flow, stop_flow flow_id = String() + # start_flow - optional parameters for flow customization + parameters = Map(String()) + class FlowResponse(Record): # list_classes @@ -52,6 +55,9 @@ class FlowResponse(Record): # get_flow description = String() + # get_flow - parameters used when flow was started + parameters = Map(String()) + # Everything error = Error() diff --git a/trustgraph-cli/trustgraph/cli/show_flows.py b/trustgraph-cli/trustgraph/cli/show_flows.py index a405d830..06fbfbe8 100644 --- a/trustgraph-cli/trustgraph/cli/show_flows.py +++ b/trustgraph-cli/trustgraph/cli/show_flows.py @@ -74,6 +74,13 @@ def show_flows(url): table.append(("id", id)) table.append(("class", flow.get("class-name", ""))) table.append(("desc", flow.get("description", ""))) + + # Display parameters if they exist + parameters = flow.get("parameters", {}) + if parameters: + param_str = json.dumps(parameters, indent=2) + table.append(("parameters", param_str)) + table.append(("queue", describe_interfaces(interface_defs, flow))) print(tabulate.tabulate( diff --git a/trustgraph-cli/trustgraph/cli/start_flow.py b/trustgraph-cli/trustgraph/cli/start_flow.py index 36048474..e8c1822c 100644 --- a/trustgraph-cli/trustgraph/cli/start_flow.py +++ b/trustgraph-cli/trustgraph/cli/start_flow.py @@ -10,7 +10,7 @@ import json default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') -def start_flow(url, class_name, flow_id, description): +def start_flow(url, class_name, flow_id, description, parameters=None): api = Api(url).flow() @@ -18,6 +18,7 @@ def start_flow(url, class_name, flow_id, description): class_name = class_name, id = flow_id, description = description, + parameters = parameters, ) def main(): @@ -51,15 +52,34 @@ def main(): help=f'Flow description', ) + parser.add_argument( + '-p', '--parameters', + help=f'Flow parameters as JSON string (e.g., \'{"model": "gpt-4", "temp": 0.7}\')', + ) + + parser.add_argument( + '--parameters-file', + help=f'Path to JSON file containing flow parameters', + ) + args = parser.parse_args() try: + # Parse parameters from command line arguments + parameters = None + + if args.parameters_file: + with open(args.parameters_file, 'r') as f: + parameters = json.load(f) + elif args.parameters: + parameters = json.loads(args.parameters) start_flow( url = args.api_url, class_name = args.class_name, flow_id = args.flow_id, description = args.description, + parameters = parameters, ) except Exception as e: diff --git a/trustgraph-flow/trustgraph/config/service/flow.py b/trustgraph-flow/trustgraph/config/service/flow.py index 3e83f8fa..5789ef0d 100644 --- a/trustgraph-flow/trustgraph/config/service/flow.py +++ b/trustgraph-flow/trustgraph/config/service/flow.py @@ -68,11 +68,14 @@ class FlowConfig: async def handle_get_flow(self, msg): - flow = await self.config.get("flows").get(msg.flow_id) + flow_data = await self.config.get("flows").get(msg.flow_id) + flow = json.loads(flow_data) return FlowResponse( error = None, - flow = flow, + flow = flow_data, + description = flow.get("description", ""), + parameters = flow.get("parameters", {}), ) async def handle_start_flow(self, msg): @@ -92,16 +95,24 @@ class FlowConfig: if msg.class_name not in await self.config.get("flow-classes").values(): raise RuntimeError("Class does not exist") - def repl_template(tmp): - return tmp.replace( + cls = json.loads( + await self.config.get("flow-classes").get(msg.class_name) + ) + + # Get parameters from message (default to empty dict if not provided) + parameters = msg.parameters if msg.parameters else {} + + # Apply parameter substitution to template replacement function + def repl_template_with_params(tmp): + result = tmp.replace( "{class}", msg.class_name ).replace( "{id}", msg.flow_id ) - - cls = json.loads( - await self.config.get("flow-classes").get(msg.class_name) - ) + # Apply parameter substitutions + for param_name, param_value in parameters.items(): + result = result.replace(f"{{{param_name}}}", str(param_value)) + return result for kind in ("class", "flow"): @@ -109,10 +120,10 @@ class FlowConfig: processor, variant = k.split(":", 1) - variant = repl_template(variant) + variant = repl_template_with_params(variant) v = { - repl_template(k2): repl_template(v2) + repl_template_with_params(k2): repl_template_with_params(v2) for k2, v2 in v.items() } @@ -131,10 +142,10 @@ class FlowConfig: def repl_interface(i): if isinstance(i, str): - return repl_template(i) + return repl_template_with_params(i) else: return { - k: repl_template(v) + k: repl_template_with_params(v) for k, v in i.items() } @@ -152,6 +163,7 @@ class FlowConfig: "description": msg.description, "class-name": msg.class_name, "interfaces": interfaces, + "parameters": parameters, }) ) @@ -177,15 +189,20 @@ class FlowConfig: raise RuntimeError("Internal error: flow has no flow class") class_name = flow["class-name"] + parameters = flow.get("parameters", {}) cls = json.loads(await self.config.get("flow-classes").get(class_name)) def repl_template(tmp): - return tmp.replace( + result = tmp.replace( "{class}", class_name ).replace( "{id}", msg.flow_id ) + # Apply parameter substitutions + for param_name, param_value in parameters.items(): + result = result.replace(f"{{{param_name}}}", str(param_value)) + return result for kind in ("flow",):