Complete flow parameters work, still needs implementation in LLMs

This commit is contained in:
Cyber MacGeddon 2025-09-24 13:57:13 +01:00
parent aca11e36d6
commit 652300622f
9 changed files with 69 additions and 48 deletions

View file

@ -70,6 +70,8 @@ some-containers:
-t ${CONTAINER_BASE}/trustgraph-base:${VERSION} . -t ${CONTAINER_BASE}/trustgraph-base:${VERSION} .
${DOCKER} build -f containers/Containerfile.flow \ ${DOCKER} build -f containers/Containerfile.flow \
-t ${CONTAINER_BASE}/trustgraph-flow:${VERSION} . -t ${CONTAINER_BASE}/trustgraph-flow:${VERSION} .
${DOCKER} build -f containers/Containerfile.vertexai \
-t ${CONTAINER_BASE}/trustgraph-vertexai:${VERSION} .
# ${DOCKER} build -f containers/Containerfile.mcp \ # ${DOCKER} build -f containers/Containerfile.mcp \
# -t ${CONTAINER_BASE}/trustgraph-mcp:${VERSION} . # -t ${CONTAINER_BASE}/trustgraph-mcp:${VERSION} .
# ${DOCKER} build -f containers/Containerfile.vertexai \ # ${DOCKER} build -f containers/Containerfile.vertexai \

View file

@ -2,43 +2,43 @@
## Overview ## Overview
This specification describes the implementation of configurable parameters for flow classes in TrustGraph. Parameters enable users to customize processor settings at flow launch time by providing values that replace parameter placeholders in the flow class definition. This specification describes the implementation of configurable parameters for flow classes in TrustGraph. Parameters enable users to customize processor parameters at flow launch time by providing values that replace parameter placeholders in the flow class definition.
Parameters work through template variable substitution in processor settings, similar to how `{id}` and `{class}` variables work, but with user-provided values. Parameters work through template variable substitution in processor parameters, similar to how `{id}` and `{class}` variables work, but with user-provided values.
The integration supports four primary use cases: The integration supports four primary use cases:
1. **Model Selection**: Allowing users to choose different LLM models (e.g., `gemma3:8b`, `gpt-4`, `claude-3`) for processors 1. **Model Selection**: Allowing users to choose different LLM models (e.g., `gemma3:8b`, `gpt-4`, `claude-3`) for processors
2. **Resource Configuration**: Adjusting processor settings like chunk sizes, batch sizes, and concurrency limits 2. **Resource Configuration**: Adjusting processor parameters like chunk sizes, batch sizes, and concurrency limits
3. **Behavioral Tuning**: Modifying processor behavior through parameters like temperature, max-tokens, or retrieval thresholds 3. **Behavioral Tuning**: Modifying processor behavior through parameters like temperature, max-tokens, or retrieval thresholds
4. **Environment-Specific Settings**: Configuring endpoints, API keys, or region-specific URLs per deployment 4. **Environment-Specific Parameters**: Configuring endpoints, API keys, or region-specific URLs per deployment
## Goals ## Goals
- **Dynamic Processor Configuration**: Enable runtime configuration of processor settings through parameter substitution - **Dynamic Processor Configuration**: Enable runtime configuration of processor parameters through parameter substitution
- **Parameter Validation**: Provide type checking and validation for parameters at flow launch time - **Parameter Validation**: Provide type checking and validation for parameters at flow launch time
- **Default Values**: Support sensible defaults while allowing overrides for advanced users - **Default Values**: Support sensible defaults while allowing overrides for advanced users
- **Template Substitution**: Seamlessly replace parameter placeholders in processor settings - **Template Substitution**: Seamlessly replace parameter placeholders in processor parameters
- **UI Integration**: Enable parameter input through both API and UI interfaces - **UI Integration**: Enable parameter input through both API and UI interfaces
- **Type Safety**: Ensure parameter types match expected processor setting types - **Type Safety**: Ensure parameter types match expected processor parameter types
- **Documentation**: Self-documenting parameter schemas within flow class definitions - **Documentation**: Self-documenting parameter schemas within flow class definitions
- **Backward Compatibility**: Maintain compatibility with existing flow classes that don't use parameters - **Backward Compatibility**: Maintain compatibility with existing flow classes that don't use parameters
## Background ## Background
Flow classes in TrustGraph now support processor settings that can contain either fixed values or parameter placeholders. This creates an opportunity for runtime customization. Flow classes in TrustGraph now support processor parameters that can contain either fixed values or parameter placeholders. This creates an opportunity for runtime customization.
Current processor settings support: Current processor parameters support:
- Fixed values: `"model": "gemma3:12b"` - Fixed values: `"model": "gemma3:12b"`
- Parameter placeholders: `"model": "gemma3:{model-size}"` - Parameter placeholders: `"model": "gemma3:{model-size}"`
This specification defines how parameters are: This specification defines how parameters are:
- Declared in flow class definitions - Declared in flow class definitions
- Validated when flows are launched - Validated when flows are launched
- Substituted in processor settings - Substituted in processor parameters
- Exposed through APIs and UI - Exposed through APIs and UI
By leveraging parameterized processor settings, TrustGraph can: By leveraging parameterized processor parameters, TrustGraph can:
- Reduce flow class duplication by using parameters for variations - Reduce flow class duplication by using parameters for variations
- Enable users to tune processor behavior without modifying definitions - Enable users to tune processor behavior without modifying definitions
- Support environment-specific configurations through parameter values - Support environment-specific configurations through parameter values
@ -144,7 +144,7 @@ Flow classes reference parameter definitions by name:
"text-completion:{class}": { "text-completion:{class}": {
"request": "non-persistent://tg/request/text-completion:{class}", "request": "non-persistent://tg/request/text-completion:{class}",
"response": "non-persistent://tg/response/text-completion:{class}", "response": "non-persistent://tg/response/text-completion:{class}",
"settings": { "parameters": {
"model": "{model}", "model": "{model}",
"temperature": "{temp}" "temperature": "{temp}"
} }
@ -154,7 +154,7 @@ Flow classes reference parameter definitions by name:
"chunker:{id}": { "chunker:{id}": {
"input": "persistent://tg/flow/chunk:{id}", "input": "persistent://tg/flow/chunk:{id}",
"output": "persistent://tg/flow/chunk-load:{id}", "output": "persistent://tg/flow/chunk-load:{id}",
"settings": { "parameters": {
"chunk_size": "{chunk}", "chunk_size": "{chunk}",
"chunk_overlap": 100 "chunk_overlap": 100
} }
@ -193,7 +193,7 @@ The flow launch API accepts parameters using the flow's parameter names:
The system will: The system will:
1. Map flow parameter names to their definitions (e.g., `model``llm-model`) 1. Map flow parameter names to their definitions (e.g., `model``llm-model`)
2. Validate values against the parameter definitions 2. Validate values against the parameter definitions
3. Substitute values into processor settings during flow instantiation 3. Substitute values into processor parameters during flow instantiation
### Implementation Details ### Implementation Details
@ -203,8 +203,8 @@ The system will:
2. **Definition Lookup**: Retrieve parameter definitions from schema/config store 2. **Definition Lookup**: Retrieve parameter definitions from schema/config store
3. **Validation**: Validate user-provided parameters against definitions 3. **Validation**: Validate user-provided parameters against definitions
4. **Default Application**: Apply default values for missing parameters 4. **Default Application**: Apply default values for missing parameters
5. **Template Substitution**: Replace parameter placeholders in processor settings 5. **Template Substitution**: Replace parameter placeholders in processor parameters
6. **Processor Instantiation**: Create processors with substituted settings 6. **Processor Instantiation**: Create processors with substituted parameters
#### Pulsar Integration #### Pulsar Integration
@ -250,7 +250,6 @@ The system will:
- The config manager needs to store both the original user-provided parameters and the resolved values (with defaults applied) - The config manager needs to store both the original user-provided parameters and the resolved values (with defaults applied)
- Flow objects in the config system should include: - Flow objects in the config system should include:
- `parameters`: The final resolved parameter values used for the flow - `parameters`: The final resolved parameter values used for the flow
- `parameter_definitions`: Reference to the parameter schema used for validation
#### CLI Integration #### CLI Integration
@ -267,20 +266,20 @@ The system will:
#### Processor Base Class Integration #### Processor Base Class Integration
5. **SettingsSpec Support** 5. **ParameterSpec Support**
- Processor base classes need to support parameter substitution through the existing SettingsSpec mechanism - Processor base classes need to support parameter substitution through the existing ParametersSpec mechanism
- The SettingsSpec class (located in the same module as ConsumerSpec and ProducerSpec) should be enhanced if necessary to support parameter template substitution - The ParametersSpec class (located in the same module as ConsumerSpec and ProducerSpec) should be enhanced if necessary to support parameter template substitution
- Processors should be able to invoke SettingsSpec to configure their settings with parameter values resolved at flow launch time - Processors should be able to invoke ParametersSpec to configure their parameters with parameter values resolved at flow launch time
- The SettingsSpec implementation needs to: - The ParametersSpec implementation needs to:
- Accept settings configurations that contain parameter placeholders (e.g., `{model}`, `{temperature}`) - Accept parameters configurations that contain parameter placeholders (e.g., `{model}`, `{temperature}`)
- Support runtime parameter substitution when the processor is instantiated - Support runtime parameter substitution when the processor is instantiated
- Validate that substituted values match expected types and constraints - Validate that substituted values match expected types and constraints
- Provide error handling for missing or invalid parameter references - Provide error handling for missing or invalid parameter references
#### Substitution Rules #### Substitution Rules
- Parameters use the format `{parameter-name}` in processor settings - Parameters use the format `{parameter-name}` in processor parameters
- Parameter names in settings match the keys in the flow's `parameters` section - Parameter names in parameters match the keys in the flow's `parameters` section
- Substitution occurs alongside `{id}` and `{class}` replacement - Substitution occurs alongside `{id}` and `{class}` replacement
- Invalid parameter references result in launch-time errors - Invalid parameter references result in launch-time errors
- Type validation happens based on the centrally-stored parameter definition - Type validation happens based on the centrally-stored parameter definition
@ -288,15 +287,15 @@ The system will:
Example resolution: Example resolution:
``` ```
Flow parameter mapping: "model": "llm-model" Flow parameter mapping: "model": "llm-model"
Processor setting: "model": "{model}" Processor parameter: "model": "{model}"
User provides: "model": "gemma3:8b" User provides: "model": "gemma3:8b"
Final setting: "model": "gemma3:8b" Final parameter: "model": "gemma3:8b"
``` ```
## Testing Strategy ## Testing Strategy
- Unit tests for parameter schema validation - Unit tests for parameter schema validation
- Integration tests for parameter substitution in processor settings - Integration tests for parameter substitution in processor parameters
- End-to-end tests for launching flows with different parameter values - End-to-end tests for launching flows with different parameter values
- UI tests for parameter form generation and validation - UI tests for parameter form generation and validation
- Performance tests for flows with many parameters - Performance tests for flows with many parameters
@ -317,8 +316,8 @@ A: The parameter values will be string encoded, we're probably going to want
to stick to strings. to stick to strings.
Q: Should parameter placeholders be allowed in queue names or only in Q: Should parameter placeholders be allowed in queue names or only in
settings? parameters?
A: Only in settings to remove strange injections and edge-cases. A: Only in parameters to remove strange injections and edge-cases.
Q: How to handle conflicts between parameter names and system variables like Q: How to handle conflicts between parameter names and system variables like
`id` and `class`? `id` and `class`?

View file

@ -8,7 +8,7 @@ from . subscriber import Subscriber
from . metrics import ProcessorMetrics, ConsumerMetrics, ProducerMetrics from . metrics import ProcessorMetrics, ConsumerMetrics, ProducerMetrics
from . flow_processor import FlowProcessor from . flow_processor import FlowProcessor
from . consumer_spec import ConsumerSpec from . consumer_spec import ConsumerSpec
from . setting_spec import SettingSpec from . parameter_spec import ParameterSpec
from . producer_spec import ProducerSpec from . producer_spec import ProducerSpec
from . subscriber_spec import SubscriberSpec from . subscriber_spec import SubscriberSpec
from . request_response_spec import RequestResponseSpec from . request_response_spec import RequestResponseSpec

View file

@ -12,7 +12,7 @@ class Flow:
# Consumers and publishers. Is this a bit untidy? # Consumers and publishers. Is this a bit untidy?
self.consumer = {} self.consumer = {}
self.setting = {} self.parameter = {}
for spec in processor.specifications: for spec in processor.specifications:
spec.add(self, processor, defn) spec.add(self, processor, defn)
@ -28,5 +28,5 @@ class Flow:
def __call__(self, key): def __call__(self, key):
if key in self.producer: return self.producer[key] if key in self.producer: return self.producer[key]
if key in self.consumer: return self.consumer[key] if key in self.consumer: return self.consumer[key]
if key in self.setting: return self.setting[key].value if key in self.parameter: return self.parameter[key].value
return None return None

View file

@ -35,7 +35,7 @@ class FlowProcessor(AsyncProcessor):
# These can be overriden by a derived class: # These can be overriden by a derived class:
# Array of specifications: ConsumerSpec, ProducerSpec, SettingSpec # Array of specifications: ConsumerSpec, ProducerSpec, ParameterSpec
self.specifications = [] self.specifications = []
logger.info("Service initialised.") logger.info("Service initialised.")

View file

@ -9,7 +9,7 @@ from prometheus_client import Histogram
from .. schema import TextCompletionRequest, TextCompletionResponse, Error from .. schema import TextCompletionRequest, TextCompletionResponse, Error
from .. exceptions import TooManyRequests from .. exceptions import TooManyRequests
from .. base import FlowProcessor, ConsumerSpec, ProducerSpec from .. base import FlowProcessor, ConsumerSpec, ProducerSpec, ParameterSpec
# Module logger # Module logger
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -56,6 +56,12 @@ class LlmService(FlowProcessor):
) )
) )
self.register_specification(
ParameterSpec(
name = "model",
)
)
if not hasattr(__class__, "text_completion_metric"): if not hasattr(__class__, "text_completion_metric"):
__class__.text_completion_metric = Histogram( __class__.text_completion_metric = Histogram(
'text_completion_duration', 'text_completion_duration',
@ -74,6 +80,11 @@ class LlmService(FlowProcessor):
try: try:
try:
logger.debug(f"MODEL IS {flow('model')}")
except:
logger.debug(f"CAN'T GET MODEL")
request = msg.value() request = msg.value()
# Sender-produced ID # Sender-produced ID

View file

@ -1,7 +1,7 @@
from . spec import Spec from . spec import Spec
class Setting: class Parameter:
def __init__(self, value): def __init__(self, value):
self.value = value self.value = value
async def start(): async def start():
@ -9,11 +9,13 @@ class Setting:
async def stop(): async def stop():
pass pass
class SettingSpec(Spec): class ParameterSpec(Spec):
def __init__(self, name): def __init__(self, name):
self.name = name self.name = name
def add(self, flow, processor, definition): def add(self, flow, processor, definition):
flow.config[self.name] = Setting(definition[self.name]) value = definition.get(self.name, None)
flow.parameter[self.name] = Parameter(value)

View file

@ -86,13 +86,13 @@ class FlowConfig:
if msg.flow_id is None: if msg.flow_id is None:
raise RuntimeError("No flow ID") raise RuntimeError("No flow ID")
if msg.flow_id in await self.config.get("flows").values(): if msg.flow_id in await self.config.get("flows").keys():
raise RuntimeError("Flow already exists") raise RuntimeError("Flow already exists")
if msg.description is None: if msg.description is None:
raise RuntimeError("No description") raise RuntimeError("No description")
if msg.class_name not in await self.config.get("flow-classes").values(): if msg.class_name not in await self.config.get("flow-classes").keys():
raise RuntimeError("Class does not exist") raise RuntimeError("Class does not exist")
cls = json.loads( cls = json.loads(
@ -104,6 +104,7 @@ class FlowConfig:
# Apply parameter substitution to template replacement function # Apply parameter substitution to template replacement function
def repl_template_with_params(tmp): def repl_template_with_params(tmp):
result = tmp.replace( result = tmp.replace(
"{class}", msg.class_name "{class}", msg.class_name
).replace( ).replace(
@ -112,6 +113,7 @@ class FlowConfig:
# Apply parameter substitutions # Apply parameter substitutions
for param_name, param_value in parameters.items(): for param_name, param_value in parameters.items():
result = result.replace(f"{{{param_name}}}", str(param_value)) result = result.replace(f"{{{param_name}}}", str(param_value))
return result return result
for kind in ("class", "flow"): for kind in ("class", "flow"):
@ -127,12 +129,17 @@ class FlowConfig:
for k2, v2 in v.items() for k2, v2 in v.items()
} }
flac = await self.config.get("flows-active").values() flac = await self.config.get("flows-active").get(processor)
if processor in flac: if flac is not None:
target = json.loads(flac[processor]) target = json.loads(flac)
else: else:
target = {} target = {}
# The condition if variant not in target: means it only adds
# the configuration if the variant doesn't already exist.
# If "everything" already exists in the target with old
# values, they won't update.
if variant not in target: if variant not in target:
target[variant] = v target[variant] = v
@ -212,10 +219,10 @@ class FlowConfig:
variant = repl_template(variant) variant = repl_template(variant)
flac = await self.config.get("flows-active").values() flac = await self.config.get("flows-active").get(processor)
if processor in flac: if flac is not None:
target = json.loads(flac[processor]) target = json.loads(flac)
else: else:
target = {} target = {}
@ -226,7 +233,7 @@ class FlowConfig:
processor, json.dumps(target) processor, json.dumps(target)
) )
if msg.flow_id in await self.config.get("flows").values(): if msg.flow_id in await self.config.get("flows").keys():
await self.config.get("flows").delete(msg.flow_id) await self.config.get("flows").delete(msg.flow_id)
await self.config.inc_version() await self.config.inc_version()

View file

@ -145,7 +145,7 @@ class ConfigTableStore:
""") """)
self.get_all_stmt = self.cassandra.prepare(""" self.get_all_stmt = self.cassandra.prepare("""
SELECT class, key, value FROM config; SELECT class AS cls, key, value FROM config;
""") """)
self.get_values_stmt = self.cassandra.prepare(""" self.get_values_stmt = self.cassandra.prepare("""