diff --git a/Makefile b/Makefile index cb7b1526..c7b4797f 100644 --- a/Makefile +++ b/Makefile @@ -70,6 +70,8 @@ some-containers: -t ${CONTAINER_BASE}/trustgraph-base:${VERSION} . ${DOCKER} build -f containers/Containerfile.flow \ -t ${CONTAINER_BASE}/trustgraph-flow:${VERSION} . + ${DOCKER} build -f containers/Containerfile.vertexai \ + -t ${CONTAINER_BASE}/trustgraph-vertexai:${VERSION} . # ${DOCKER} build -f containers/Containerfile.mcp \ # -t ${CONTAINER_BASE}/trustgraph-mcp:${VERSION} . # ${DOCKER} build -f containers/Containerfile.vertexai \ diff --git a/docs/tech-specs/flow-configurable-parameters.md b/docs/tech-specs/flow-configurable-parameters.md index 1f48b853..8441a19e 100644 --- a/docs/tech-specs/flow-configurable-parameters.md +++ b/docs/tech-specs/flow-configurable-parameters.md @@ -2,43 +2,43 @@ ## Overview -This specification describes the implementation of configurable parameters for flow classes in TrustGraph. Parameters enable users to customize processor settings at flow launch time by providing values that replace parameter placeholders in the flow class definition. +This specification describes the implementation of configurable parameters for flow classes in TrustGraph. Parameters enable users to customize processor parameters at flow launch time by providing values that replace parameter placeholders in the flow class definition. -Parameters work through template variable substitution in processor settings, similar to how `{id}` and `{class}` variables work, but with user-provided values. +Parameters work through template variable substitution in processor parameters, similar to how `{id}` and `{class}` variables work, but with user-provided values. The integration supports four primary use cases: 1. **Model Selection**: Allowing users to choose different LLM models (e.g., `gemma3:8b`, `gpt-4`, `claude-3`) for processors -2. **Resource Configuration**: Adjusting processor settings like chunk sizes, batch sizes, and concurrency limits +2. **Resource Configuration**: Adjusting processor parameters like chunk sizes, batch sizes, and concurrency limits 3. **Behavioral Tuning**: Modifying processor behavior through parameters like temperature, max-tokens, or retrieval thresholds -4. **Environment-Specific Settings**: Configuring endpoints, API keys, or region-specific URLs per deployment +4. **Environment-Specific Parameters**: Configuring endpoints, API keys, or region-specific URLs per deployment ## Goals -- **Dynamic Processor Configuration**: Enable runtime configuration of processor settings through parameter substitution +- **Dynamic Processor Configuration**: Enable runtime configuration of processor parameters through parameter substitution - **Parameter Validation**: Provide type checking and validation for parameters at flow launch time - **Default Values**: Support sensible defaults while allowing overrides for advanced users -- **Template Substitution**: Seamlessly replace parameter placeholders in processor settings +- **Template Substitution**: Seamlessly replace parameter placeholders in processor parameters - **UI Integration**: Enable parameter input through both API and UI interfaces -- **Type Safety**: Ensure parameter types match expected processor setting types +- **Type Safety**: Ensure parameter types match expected processor parameter types - **Documentation**: Self-documenting parameter schemas within flow class definitions - **Backward Compatibility**: Maintain compatibility with existing flow classes that don't use parameters ## Background -Flow classes in TrustGraph now support processor settings that can contain either fixed values or parameter placeholders. This creates an opportunity for runtime customization. +Flow classes in TrustGraph now support processor parameters that can contain either fixed values or parameter placeholders. This creates an opportunity for runtime customization. -Current processor settings support: +Current processor parameters support: - Fixed values: `"model": "gemma3:12b"` - Parameter placeholders: `"model": "gemma3:{model-size}"` This specification defines how parameters are: - Declared in flow class definitions - Validated when flows are launched -- Substituted in processor settings +- Substituted in processor parameters - Exposed through APIs and UI -By leveraging parameterized processor settings, TrustGraph can: +By leveraging parameterized processor parameters, TrustGraph can: - Reduce flow class duplication by using parameters for variations - Enable users to tune processor behavior without modifying definitions - Support environment-specific configurations through parameter values @@ -144,7 +144,7 @@ Flow classes reference parameter definitions by name: "text-completion:{class}": { "request": "non-persistent://tg/request/text-completion:{class}", "response": "non-persistent://tg/response/text-completion:{class}", - "settings": { + "parameters": { "model": "{model}", "temperature": "{temp}" } @@ -154,7 +154,7 @@ Flow classes reference parameter definitions by name: "chunker:{id}": { "input": "persistent://tg/flow/chunk:{id}", "output": "persistent://tg/flow/chunk-load:{id}", - "settings": { + "parameters": { "chunk_size": "{chunk}", "chunk_overlap": 100 } @@ -193,7 +193,7 @@ The flow launch API accepts parameters using the flow's parameter names: The system will: 1. Map flow parameter names to their definitions (e.g., `model` → `llm-model`) 2. Validate values against the parameter definitions -3. Substitute values into processor settings during flow instantiation +3. Substitute values into processor parameters during flow instantiation ### Implementation Details @@ -203,8 +203,8 @@ The system will: 2. **Definition Lookup**: Retrieve parameter definitions from schema/config store 3. **Validation**: Validate user-provided parameters against definitions 4. **Default Application**: Apply default values for missing parameters -5. **Template Substitution**: Replace parameter placeholders in processor settings -6. **Processor Instantiation**: Create processors with substituted settings +5. **Template Substitution**: Replace parameter placeholders in processor parameters +6. **Processor Instantiation**: Create processors with substituted parameters #### Pulsar Integration @@ -250,7 +250,6 @@ The system will: - The config manager needs to store both the original user-provided parameters and the resolved values (with defaults applied) - Flow objects in the config system should include: - `parameters`: The final resolved parameter values used for the flow - - `parameter_definitions`: Reference to the parameter schema used for validation #### CLI Integration @@ -267,20 +266,20 @@ The system will: #### Processor Base Class Integration -5. **SettingsSpec Support** - - Processor base classes need to support parameter substitution through the existing SettingsSpec mechanism - - The SettingsSpec class (located in the same module as ConsumerSpec and ProducerSpec) should be enhanced if necessary to support parameter template substitution - - Processors should be able to invoke SettingsSpec to configure their settings with parameter values resolved at flow launch time - - The SettingsSpec implementation needs to: - - Accept settings configurations that contain parameter placeholders (e.g., `{model}`, `{temperature}`) +5. **ParameterSpec Support** + - Processor base classes need to support parameter substitution through the existing ParametersSpec mechanism + - The ParametersSpec class (located in the same module as ConsumerSpec and ProducerSpec) should be enhanced if necessary to support parameter template substitution + - Processors should be able to invoke ParametersSpec to configure their parameters with parameter values resolved at flow launch time + - The ParametersSpec implementation needs to: + - Accept parameters configurations that contain parameter placeholders (e.g., `{model}`, `{temperature}`) - Support runtime parameter substitution when the processor is instantiated - Validate that substituted values match expected types and constraints - Provide error handling for missing or invalid parameter references #### Substitution Rules -- Parameters use the format `{parameter-name}` in processor settings -- Parameter names in settings match the keys in the flow's `parameters` section +- Parameters use the format `{parameter-name}` in processor parameters +- Parameter names in parameters match the keys in the flow's `parameters` section - Substitution occurs alongside `{id}` and `{class}` replacement - Invalid parameter references result in launch-time errors - Type validation happens based on the centrally-stored parameter definition @@ -288,15 +287,15 @@ The system will: Example resolution: ``` Flow parameter mapping: "model": "llm-model" -Processor setting: "model": "{model}" +Processor parameter: "model": "{model}" User provides: "model": "gemma3:8b" -Final setting: "model": "gemma3:8b" +Final parameter: "model": "gemma3:8b" ``` ## Testing Strategy - Unit tests for parameter schema validation -- Integration tests for parameter substitution in processor settings +- Integration tests for parameter substitution in processor parameters - End-to-end tests for launching flows with different parameter values - UI tests for parameter form generation and validation - Performance tests for flows with many parameters @@ -317,8 +316,8 @@ A: The parameter values will be string encoded, we're probably going to want to stick to strings. Q: Should parameter placeholders be allowed in queue names or only in - settings? -A: Only in settings to remove strange injections and edge-cases. + parameters? +A: Only in parameters to remove strange injections and edge-cases. Q: How to handle conflicts between parameter names and system variables like `id` and `class`? diff --git a/trustgraph-base/trustgraph/base/__init__.py b/trustgraph-base/trustgraph/base/__init__.py index 7ef199d3..0bdf1f7a 100644 --- a/trustgraph-base/trustgraph/base/__init__.py +++ b/trustgraph-base/trustgraph/base/__init__.py @@ -8,7 +8,7 @@ from . subscriber import Subscriber from . metrics import ProcessorMetrics, ConsumerMetrics, ProducerMetrics from . flow_processor import FlowProcessor from . consumer_spec import ConsumerSpec -from . setting_spec import SettingSpec +from . parameter_spec import ParameterSpec from . producer_spec import ProducerSpec from . subscriber_spec import SubscriberSpec from . request_response_spec import RequestResponseSpec diff --git a/trustgraph-base/trustgraph/base/flow.py b/trustgraph-base/trustgraph/base/flow.py index 9cda34a0..cf88cbfd 100644 --- a/trustgraph-base/trustgraph/base/flow.py +++ b/trustgraph-base/trustgraph/base/flow.py @@ -12,7 +12,7 @@ class Flow: # Consumers and publishers. Is this a bit untidy? self.consumer = {} - self.setting = {} + self.parameter = {} for spec in processor.specifications: spec.add(self, processor, defn) @@ -28,5 +28,5 @@ class Flow: def __call__(self, key): if key in self.producer: return self.producer[key] if key in self.consumer: return self.consumer[key] - if key in self.setting: return self.setting[key].value + if key in self.parameter: return self.parameter[key].value return None diff --git a/trustgraph-base/trustgraph/base/flow_processor.py b/trustgraph-base/trustgraph/base/flow_processor.py index 6d3ba64f..385e1346 100644 --- a/trustgraph-base/trustgraph/base/flow_processor.py +++ b/trustgraph-base/trustgraph/base/flow_processor.py @@ -35,7 +35,7 @@ class FlowProcessor(AsyncProcessor): # These can be overriden by a derived class: - # Array of specifications: ConsumerSpec, ProducerSpec, SettingSpec + # Array of specifications: ConsumerSpec, ProducerSpec, ParameterSpec self.specifications = [] logger.info("Service initialised.") diff --git a/trustgraph-base/trustgraph/base/llm_service.py b/trustgraph-base/trustgraph/base/llm_service.py index 37b0e1c2..61e48d2f 100644 --- a/trustgraph-base/trustgraph/base/llm_service.py +++ b/trustgraph-base/trustgraph/base/llm_service.py @@ -9,7 +9,7 @@ from prometheus_client import Histogram from .. schema import TextCompletionRequest, TextCompletionResponse, Error from .. exceptions import TooManyRequests -from .. base import FlowProcessor, ConsumerSpec, ProducerSpec +from .. base import FlowProcessor, ConsumerSpec, ProducerSpec, ParameterSpec # Module logger logger = logging.getLogger(__name__) @@ -56,6 +56,12 @@ class LlmService(FlowProcessor): ) ) + self.register_specification( + ParameterSpec( + name = "model", + ) + ) + if not hasattr(__class__, "text_completion_metric"): __class__.text_completion_metric = Histogram( 'text_completion_duration', @@ -74,6 +80,11 @@ class LlmService(FlowProcessor): try: + try: + logger.debug(f"MODEL IS {flow('model')}") + except: + logger.debug(f"CAN'T GET MODEL") + request = msg.value() # Sender-produced ID diff --git a/trustgraph-base/trustgraph/base/setting_spec.py b/trustgraph-base/trustgraph/base/parameter_spec.py similarity index 64% rename from trustgraph-base/trustgraph/base/setting_spec.py rename to trustgraph-base/trustgraph/base/parameter_spec.py index 5c5152b2..9f076ceb 100644 --- a/trustgraph-base/trustgraph/base/setting_spec.py +++ b/trustgraph-base/trustgraph/base/parameter_spec.py @@ -1,7 +1,7 @@ from . spec import Spec -class Setting: +class Parameter: def __init__(self, value): self.value = value async def start(): @@ -9,11 +9,13 @@ class Setting: async def stop(): pass -class SettingSpec(Spec): +class ParameterSpec(Spec): def __init__(self, name): self.name = name def add(self, flow, processor, definition): - flow.config[self.name] = Setting(definition[self.name]) + value = definition.get(self.name, None) + + flow.parameter[self.name] = Parameter(value) diff --git a/trustgraph-flow/trustgraph/config/service/flow.py b/trustgraph-flow/trustgraph/config/service/flow.py index 5789ef0d..6b55c36f 100644 --- a/trustgraph-flow/trustgraph/config/service/flow.py +++ b/trustgraph-flow/trustgraph/config/service/flow.py @@ -86,13 +86,13 @@ class FlowConfig: if msg.flow_id is None: raise RuntimeError("No flow ID") - if msg.flow_id in await self.config.get("flows").values(): + if msg.flow_id in await self.config.get("flows").keys(): raise RuntimeError("Flow already exists") if msg.description is None: raise RuntimeError("No description") - if msg.class_name not in await self.config.get("flow-classes").values(): + if msg.class_name not in await self.config.get("flow-classes").keys(): raise RuntimeError("Class does not exist") cls = json.loads( @@ -104,6 +104,7 @@ class FlowConfig: # Apply parameter substitution to template replacement function def repl_template_with_params(tmp): + result = tmp.replace( "{class}", msg.class_name ).replace( @@ -112,6 +113,7 @@ class FlowConfig: # Apply parameter substitutions for param_name, param_value in parameters.items(): result = result.replace(f"{{{param_name}}}", str(param_value)) + return result for kind in ("class", "flow"): @@ -127,12 +129,17 @@ class FlowConfig: for k2, v2 in v.items() } - flac = await self.config.get("flows-active").values() - if processor in flac: - target = json.loads(flac[processor]) + flac = await self.config.get("flows-active").get(processor) + if flac is not None: + target = json.loads(flac) else: target = {} + # The condition if variant not in target: means it only adds + # the configuration if the variant doesn't already exist. + # If "everything" already exists in the target with old + # values, they won't update. + if variant not in target: target[variant] = v @@ -212,10 +219,10 @@ class FlowConfig: variant = repl_template(variant) - flac = await self.config.get("flows-active").values() + flac = await self.config.get("flows-active").get(processor) - if processor in flac: - target = json.loads(flac[processor]) + if flac is not None: + target = json.loads(flac) else: target = {} @@ -226,7 +233,7 @@ class FlowConfig: processor, json.dumps(target) ) - if msg.flow_id in await self.config.get("flows").values(): + if msg.flow_id in await self.config.get("flows").keys(): await self.config.get("flows").delete(msg.flow_id) await self.config.inc_version() diff --git a/trustgraph-flow/trustgraph/tables/config.py b/trustgraph-flow/trustgraph/tables/config.py index a991de18..f98929e1 100644 --- a/trustgraph-flow/trustgraph/tables/config.py +++ b/trustgraph-flow/trustgraph/tables/config.py @@ -145,7 +145,7 @@ class ConfigTableStore: """) self.get_all_stmt = self.cassandra.prepare(""" - SELECT class, key, value FROM config; + SELECT class AS cls, key, value FROM config; """) self.get_values_stmt = self.cassandra.prepare("""