Merge branch 'release/v1.2'

This commit is contained in:
Cyber MacGeddon 2025-08-26 19:18:01 +01:00
commit 0bff629f87
28 changed files with 3881 additions and 111 deletions

View file

@ -31,9 +31,9 @@ The command updates both the tool index and stores the complete tool configurati
- Must be unique within the tool registry
- `--name NAME`
- **Required.** Human-readable name for the tool
- Displayed in tool listings and user interfaces
- Should be descriptive and clear
- **Required.** Tool name used by agents to invoke this tool
- Must be a valid function identifier (use snake_case, no spaces or special characters)
- Examples: `get_weather`, `calculate_distance`, `search_documents`
- `--type TYPE`
- **Required.** Tool type defining its functionality
@ -63,7 +63,7 @@ The command updates both the tool index and stores the complete tool configurati
Register a simple weather lookup tool:
```bash
tg-set-tool --id weather --name "Weather Lookup" \
tg-set-tool --id weather_tool --name get_weather \
--type knowledge-query \
--description "Get current weather information" \
--argument location:string:"Location to query" \
@ -74,7 +74,8 @@ tg-set-tool --id weather --name "Weather Lookup" \
Register a calculator tool with MCP type:
```bash
tg-set-tool --id calculator --name "Calculator" --type mcp-tool \
tg-set-tool --id calc_tool --name calculate \
--type mcp-tool \
--description "Perform mathematical calculations" \
--argument expression:string:"Mathematical expression to evaluate"
```
@ -83,7 +84,7 @@ tg-set-tool --id calculator --name "Calculator" --type mcp-tool \
Register a text completion tool:
```bash
tg-set-tool --id text-generator --name "Text Generator" \
tg-set-tool --id text_gen_tool --name generate_text \
--type text-completion \
--description "Generate text based on prompts" \
--argument prompt:string:"Text prompt for generation" \
@ -95,7 +96,7 @@ tg-set-tool --id text-generator --name "Text Generator" \
Register a tool with custom API endpoint:
```bash
tg-set-tool -u http://trustgraph.example.com:8088/ \
--id custom-tool --name "Custom Tool" \
--id custom_tool --name custom_search \
--type knowledge-query \
--description "Custom tool functionality"
```
@ -104,7 +105,7 @@ tg-set-tool -u http://trustgraph.example.com:8088/ \
Register a simple tool with no arguments:
```bash
tg-set-tool --id status-check --name "Status Check" \
tg-set-tool --id status_tool --name check_status \
--type knowledge-query \
--description "Check system status"
```
@ -318,4 +319,4 @@ If tools aren't working as expected:
- TrustGraph Tool Development Guide
- Agent Configuration Documentation
- MCP Tool Integration Guide
- MCP Tool Integration Guide

View file

@ -0,0 +1,127 @@
# Command-Line Loading Knowledge Technical Specification
## Overview
This specification describes the command-line interfaces for loading knowledge into TrustGraph, enabling users to ingest data from various sources through command-line tools. The integration supports four primary use cases:
1. **[Use Case 1]**: [Description]
2. **[Use Case 2]**: [Description]
3. **[Use Case 3]**: [Description]
4. **[Use Case 4]**: [Description]
## Goals
- **[Goal 1]**: [Description]
- **[Goal 2]**: [Description]
- **[Goal 3]**: [Description]
- **[Goal 4]**: [Description]
- **[Goal 5]**: [Description]
- **[Goal 6]**: [Description]
- **[Goal 7]**: [Description]
- **[Goal 8]**: [Description]
## Background
[Describe the current state and limitations that this specification addresses]
Current limitations include:
- [Limitation 1]
- [Limitation 2]
- [Limitation 3]
- [Limitation 4]
This specification addresses these gaps by [description]. By [capability], TrustGraph can:
- [Benefit 1]
- [Benefit 2]
- [Benefit 3]
- [Benefit 4]
## Technical Design
### Architecture
The command-line knowledge loading requires the following technical components:
1. **[Component 1]**
- [Description of component functionality]
- [Key features]
- [Integration points]
Module: [module-path]
2. **[Component 2]**
- [Description of component functionality]
- [Key features]
- [Integration points]
Module: [module-path]
3. **[Component 3]**
- [Description of component functionality]
- [Key features]
- [Integration points]
Module: [module-path]
### Data Models
#### [Data Model 1]
[Description of data model and structure]
Example:
```
[Example data structure]
```
This approach allows:
- [Benefit 1]
- [Benefit 2]
- [Benefit 3]
- [Benefit 4]
### APIs
New APIs:
- [API description 1]
- [API description 2]
- [API description 3]
Modified APIs:
- [Modified API 1] - [Description of changes]
- [Modified API 2] - [Description of changes]
### Implementation Details
[Implementation approach and conventions]
[Additional implementation notes]
## Security Considerations
[Security considerations specific to this implementation]
## Performance Considerations
[Performance considerations and potential bottlenecks]
## Testing Strategy
[Testing approach and strategy]
## Migration Plan
[Migration strategy if applicable]
## Timeline
[Timeline information if specified]
## Open Questions
- [Open question 1]
- [Open question 2]
## References
[References if applicable]

View file

@ -0,0 +1,106 @@
# Knowledge Graph Architecture Foundations
## Foundation 1: Subject-Predicate-Object (SPO) Graph Model
**Decision**: Adopt SPO/RDF as the core knowledge representation model
**Rationale**:
- Provides maximum flexibility and interoperability with existing graph technologies
- Enables seamless translation to other graph query languages (e.g., SPO → Cypher, but not vice versa)
- Creates a foundation that "unlocks a lot" of downstream capabilities
- Supports both node-to-node relationships (SPO) and node-to-literal relationships (RDF)
**Implementation**:
- Core data structure: `node → edge → {node | literal}`
- Maintain compatibility with RDF standards while supporting extended SPO operations
## Foundation 2: LLM-Native Knowledge Graph Integration
**Decision**: Optimize knowledge graph structure and operations for LLM interaction
**Rationale**:
- Primary use case involves LLMs interfacing with knowledge graphs
- Graph technology choices must prioritize LLM compatibility over other considerations
- Enables natural language processing workflows that leverage structured knowledge
**Implementation**:
- Design graph schemas that LLMs can effectively reason about
- Optimize for common LLM interaction patterns
## Foundation 3: Embedding-Based Graph Navigation
**Decision**: Implement direct mapping from natural language queries to graph nodes via embeddings
**Rationale**:
- Enables the simplest possible path from NLP query to graph navigation
- Avoids complex intermediate query generation steps
- Provides efficient semantic search capabilities within the graph structure
**Implementation**:
- `NLP Query → Graph Embeddings → Graph Nodes`
- Maintain embedding representations for all graph entities
- Support direct semantic similarity matching for query resolution
## Foundation 4: Distributed Entity Resolution with Deterministic Identifiers
**Decision**: Support parallel knowledge extraction with deterministic entity identification (80% rule)
**Rationale**:
- **Ideal**: Single-process extraction with complete state visibility enables perfect entity resolution
- **Reality**: Scalability requirements demand parallel processing capabilities
- **Compromise**: Design for deterministic entity identification across distributed processes
**Implementation**:
- Develop mechanisms for generating consistent, unique identifiers across different knowledge extractors
- Same entity mentioned in different processes must resolve to the same identifier
- Acknowledge that ~20% of edge cases may require alternative processing models
- Design fallback mechanisms for complex entity resolution scenarios
## Foundation 5: Event-Driven Architecture with Publish-Subscribe
**Decision**: Implement pub-sub messaging system for system coordination
**Rationale**:
- Enables loose coupling between knowledge extraction, storage, and query components
- Supports real-time updates and notifications across the system
- Facilitates scalable, distributed processing workflows
**Implementation**:
- Message-driven coordination between system components
- Event streams for knowledge updates, extraction completion, and query results
## Foundation 6: Reentrant Agent Communication
**Decision**: Support reentrant pub-sub operations for agent-based processing
**Rationale**:
- Enables sophisticated agent workflows where agents can trigger and respond to each other
- Supports complex, multi-step knowledge processing pipelines
- Allows for recursive and iterative processing patterns
**Implementation**:
- Pub-sub system must handle reentrant calls safely
- Agent coordination mechanisms that prevent infinite loops
- Support for agent workflow orchestration
## Foundation 7: Columnar Data Store Integration
**Decision**: Ensure query compatibility with columnar storage systems
**Rationale**:
- Enables efficient analytical queries over large knowledge datasets
- Supports business intelligence and reporting use cases
- Bridges graph-based knowledge representation with traditional analytical workflows
**Implementation**:
- Query translation layer: Graph queries → Columnar queries
- Hybrid storage strategy supporting both graph operations and analytical workloads
- Maintain query performance across both paradigms
---
## Architecture Principles Summary
1. **Flexibility First**: SPO/RDF model provides maximum adaptability
2. **LLM Optimization**: All design decisions consider LLM interaction requirements
3. **Semantic Efficiency**: Direct embedding-to-node mapping for optimal query performance
4. **Pragmatic Scalability**: Balance perfect accuracy with practical distributed processing
5. **Event-Driven Coordination**: Pub-sub enables loose coupling and scalability
6. **Agent-Friendly**: Support complex, multi-agent processing workflows
7. **Analytical Compatibility**: Bridge graph and columnar paradigms for comprehensive querying
These foundations establish a knowledge graph architecture that balances theoretical rigor with practical scalability requirements, optimized for LLM integration and distributed processing.

View file

@ -0,0 +1,169 @@
# TrustGraph Logging Strategy
## Overview
TrustGraph uses Python's built-in `logging` module for all logging operations. This provides a standardized, flexible approach to logging across all components of the system.
## Default Configuration
### Logging Level
- **Default Level**: `INFO`
- **Debug Mode**: `DEBUG` (enabled via command-line argument)
- **Production**: `WARNING` or `ERROR` as appropriate
### Output Destination
All logs should be written to **standard output (stdout)** to ensure compatibility with containerized environments and log aggregation systems.
## Implementation Guidelines
### 1. Logger Initialization
Each module should create its own logger using the module's `__name__`:
```python
import logging
logger = logging.getLogger(__name__)
```
### 2. Centralized Configuration
The logging configuration should be centralized in `async_processor.py` (or a dedicated logging configuration module) since it's inherited by much of the codebase:
```python
import logging
import argparse
def setup_logging(log_level='INFO'):
"""Configure logging for the entire application"""
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'--log-level',
default='INFO',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
help='Set the logging level (default: INFO)'
)
return parser.parse_args()
# In main execution
if __name__ == '__main__':
args = parse_args()
setup_logging(args.log_level)
```
### 3. Logging Best Practices
#### Log Levels Usage
- **DEBUG**: Detailed information for diagnosing problems (variable values, function entry/exit)
- **INFO**: General informational messages (service started, configuration loaded, processing milestones)
- **WARNING**: Warning messages for potentially harmful situations (deprecated features, recoverable errors)
- **ERROR**: Error messages for serious problems (failed operations, exceptions)
- **CRITICAL**: Critical messages for system failures requiring immediate attention
#### Message Format
```python
# Good - includes context
logger.info(f"Processing document: {doc_id}, size: {doc_size} bytes")
logger.error(f"Failed to connect to database: {error}", exc_info=True)
# Avoid - lacks context
logger.info("Processing document")
logger.error("Connection failed")
```
#### Performance Considerations
```python
# Use lazy formatting for expensive operations
logger.debug("Expensive operation result: %s", expensive_function())
# Check log level for very expensive debug operations
if logger.isEnabledFor(logging.DEBUG):
debug_data = compute_expensive_debug_info()
logger.debug(f"Debug data: {debug_data}")
```
### 4. Structured Logging
For complex data, use structured logging:
```python
logger.info("Request processed", extra={
'request_id': request_id,
'duration_ms': duration,
'status_code': status_code,
'user_id': user_id
})
```
### 5. Exception Logging
Always include stack traces for exceptions:
```python
try:
process_data()
except Exception as e:
logger.error(f"Failed to process data: {e}", exc_info=True)
raise
```
### 6. Async Logging Considerations
For async code, ensure thread-safe logging:
```python
import asyncio
import logging
async def async_operation():
logger = logging.getLogger(__name__)
logger.info(f"Starting async operation in task: {asyncio.current_task().get_name()}")
```
## Environment Variables
Support environment-based configuration as a fallback:
```python
import os
log_level = os.environ.get('TRUSTGRAPH_LOG_LEVEL', 'INFO')
```
## Testing
During tests, consider using a different logging configuration:
```python
# In test setup
logging.getLogger().setLevel(logging.WARNING) # Reduce noise during tests
```
## Monitoring Integration
Ensure log format is compatible with monitoring tools:
- Include timestamps in ISO format
- Use consistent field names
- Include correlation IDs where applicable
- Structure logs for easy parsing (JSON format for production)
## Security Considerations
- Never log sensitive information (passwords, API keys, personal data)
- Sanitize user input before logging
- Use placeholders for sensitive fields: `user_id=****1234`
## Migration Path
For existing code using print statements:
1. Replace `print()` with appropriate logger calls
2. Choose appropriate log levels based on message importance
3. Add context to make logs more useful
4. Test logging output at different levels

View file

@ -0,0 +1,256 @@
# MCP Tool Arguments Specification
## Overview
**Feature Name**: MCP Tool Arguments Support
**Author**: Claude Code Assistant
**Date**: 2025-08-21
**Status**: Finalised
### Executive Summary
Enable ReACT agents to invoke MCP (Model Context Protocol) tools with
properly defined arguments by adding argument specification support to
MCP tool configurations, similar to how prompt template tools
currently work.
### Problem Statement
Currently, MCP tools in the ReACT agent framework cannot specify their
expected arguments. The `McpToolImpl.get_arguments()` method returns
an empty list, forcing LLMs to guess the correct parameter structure
based only on tool names and descriptions. This leads to:
- Unreliable tool invocations due to parameter guessing
- Poor user experience when tools fail due to incorrect arguments
- No validation of tool parameters before execution
- Missing parameter documentation in agent prompts
### Goals
- [ ] Allow MCP tool configurations to specify expected arguments (name, type, description)
- [ ] Update agent manager to expose MCP tool arguments to LLMs via prompts
- [ ] Maintain backward compatibility with existing MCP tool configurations
- [ ] Support argument validation similar to prompt template tools
### Non-Goals
- Dynamic argument discovery from MCP servers (future enhancement)
- Argument type validation beyond basic structure
- Complex argument schemas (nested objects, arrays)
## Background and Context
### Current State
MCP tools are configured in the ReACT agent system with minimal metadata:
```json
{
"type": "mcp-tool",
"name": "get_bank_balance",
"description": "Get bank account balance",
"mcp-tool": "get_bank_balance"
}
```
The `McpToolImpl.get_arguments()` method returns `[]`, so LLMs receive no argument guidance in their prompts.
### Limitations
1. **No argument specification**: MCP tools cannot define expected
parameters
2. **LLM parameter guessing**: Agents must infer parameters from tool
names/descriptions
3. **Missing prompt information**: Agent prompts show no argument
details for MCP tools
4. **No validation**: Invalid parameters are only caught at MCP tool
execution time
### Related Components
- **trustgraph-flow/agent/react/service.py**: Tool configuration loading and AgentManager creation
- **trustgraph-flow/agent/react/tools.py**: McpToolImpl implementation
- **trustgraph-flow/agent/react/agent_manager.py**: Prompt generation with tool arguments
- **trustgraph-cli**: CLI tools for MCP tool management
- **Workbench**: External UI for agent tool configuration
## Requirements
### Functional Requirements
1. **MCP Tool Configuration Arguments**: MCP tool configurations MUST support an optional `arguments` array with name, type, and description fields
2. **Argument Exposure**: `McpToolImpl.get_arguments()` MUST return configured arguments instead of empty list
3. **Prompt Integration**: Agent prompts MUST include MCP tool argument details when arguments are specified
4. **Backward Compatibility**: Existing MCP tool configurations without arguments MUST continue to work
5. **CLI Support**: Existing `tg-invoke-mcp-tool` CLI supports arguments (already implemented)
### Non-Functional Requirements
1. **Backward Compatibility**: Zero breaking changes for existing MCP tool configurations
2. **Performance**: No significant performance impact on agent prompt generation
3. **Consistency**: Argument handling MUST match prompt template tool patterns
### User Stories
1. As an **agent developer**, I want to specify MCP tool arguments in configuration so that LLMs can invoke tools with correct parameters
2. As a **workbench user**, I want to configure MCP tool arguments in the UI so that agents use tools properly
3. As an **LLM in a ReACT agent**, I want to see tool argument specifications in prompts so that I can provide correct parameters
## Design
### High-Level Architecture
Extend MCP tool configuration to match the prompt template pattern by:
1. Adding optional `arguments` array to MCP tool configurations
2. Modifying `McpToolImpl` to accept and return configured arguments
3. Updating tool configuration loading to handle MCP tool arguments
4. Ensuring agent prompts include MCP tool argument information
### Configuration Schema
```json
{
"type": "mcp-tool",
"name": "get_bank_balance",
"description": "Get bank account balance",
"mcp-tool": "get_bank_balance",
"arguments": [
{
"name": "account_id",
"type": "string",
"description": "Bank account identifier"
},
{
"name": "date",
"type": "string",
"description": "Date for balance query (optional, format: YYYY-MM-DD)"
}
]
}
```
### Data Flow
1. **Configuration Loading**: MCP tool config with arguments is loaded by `on_tools_config()`
2. **Tool Creation**: Arguments are parsed and passed to `McpToolImpl` via constructor
3. **Prompt Generation**: `agent_manager.py` calls `tool.arguments` to include in LLM prompts
4. **Tool Invocation**: LLM provides parameters which are passed to MCP service unchanged
### API Changes
No external API changes - this is purely internal configuration and argument handling.
### Component Details
#### Component 1: service.py (Tool Configuration Loading)
- **Purpose**: Parse MCP tool configurations and create tool instances
- **Changes Required**: Add argument parsing for MCP tools (similar to prompt tools)
- **New Functionality**: Extract `arguments` array from MCP tool config and create `Argument` objects
#### Component 2: tools.py (McpToolImpl)
- **Purpose**: MCP tool implementation wrapper
- **Changes Required**: Accept arguments in constructor and return them from `get_arguments()`
- **New Functionality**: Store and expose configured arguments instead of returning empty list
#### Component 3: Workbench (External Repository)
- **Purpose**: UI for configuring agent tools
- **Changes Required**: Add argument specification UI for MCP tools
- **New Functionality**: Allow users to add/edit/remove arguments for MCP tools
#### Component 4: CLI Tools
- **Purpose**: Command-line tool management
- **Changes Required**: Support argument specification in MCP tool creation/update commands
- **New Functionality**: Accept arguments parameter in tool configuration commands
## Implementation Plan
### Phase 1: Core Agent Framework Changes
- [ ] Update `McpToolImpl` constructor to accept `arguments` parameter
- [ ] Change `McpToolImpl.get_arguments()` to return stored arguments
- [ ] Modify `service.py` MCP tool configuration parsing to handle arguments
- [ ] Add unit tests for MCP tool argument handling
- [ ] Verify agent prompts include MCP tool arguments
### Phase 2: External Tool Support
- [ ] Update CLI tools to support MCP tool argument specification
- [ ] Document argument configuration format for users
- [ ] Update Workbench UI to support MCP tool argument configuration
- [ ] Add examples and documentation
### Code Changes Summary
| File | Change Type | Description |
|------|------------|-------------|
| `tools.py` | Modified | Update McpToolImpl to accept and store arguments |
| `service.py` | Modified | Parse arguments from MCP tool config (line 108-113) |
| `test_react_processor.py` | Modified | Add tests for MCP tool arguments |
| CLI tools | Modified | Support argument specification in commands |
| Workbench | Modified | Add UI for MCP tool argument configuration |
## Testing Strategy
### Unit Tests
- **MCP Tool Argument Parsing**: Test `service.py` correctly parses arguments from MCP tool configurations
- **McpToolImpl Arguments**: Test `get_arguments()` returns configured arguments instead of empty list
- **Backward Compatibility**: Test MCP tools without arguments continue to work (return empty list)
- **Agent Prompt Generation**: Test agent prompts include MCP tool argument details
### Integration Tests
- **End-to-End Tool Invocation**: Test agent with MCP tool arguments can successfully invoke tools
- **Configuration Loading**: Test complete config load cycle with MCP tool arguments
- **Cross-Component**: Test arguments flow correctly from config → tool creation → prompt generation
### Manual Testing
- **Agent Behavior**: Manually verify LLM receives and uses argument information in ReACT cycles
- **CLI Integration**: Test tg-invoke-mcp-tool works with new argument-configured MCP tools
- **Workbench Integration**: Test UI supports MCP tool argument configuration
## Migration and Rollout
### Migration Strategy
No migration required - this is purely additive functionality:
- Existing MCP tool configurations without `arguments` continue to work unchanged
- `McpToolImpl.get_arguments()` returns empty list for legacy tools
- New configurations can optionally include `arguments` array
### Rollout Plan
1. **Phase 1**: Deploy core agent framework changes to development/staging
2. **Phase 2**: Deploy CLI tool updates and documentation
3. **Phase 3**: Deploy Workbench UI updates for argument configuration
4. **Phase 4**: Production rollout with monitoring
### Rollback Plan
- Core changes are backward compatible - no rollback needed for functionality
- If issues arise, disable argument parsing by reverting MCP tool config loading logic
- Workbench and CLI changes are independent and can be rolled back separately
## Security Considerations
- **No new attack surface**: Arguments are parsed from existing configuration sources with no new inputs
- **Parameter validation**: Arguments are passed through to MCP tools unchanged - validation remains at MCP tool level
- **Configuration integrity**: Argument specifications are part of tool configuration - same security model applies
## Performance Impact
- **Minimal overhead**: Argument parsing happens only during configuration loading, not per-request
- **Prompt size increase**: Agent prompts will include MCP tool argument details, slightly increasing token usage
- **Memory usage**: Negligible increase for storing argument specifications in tool objects
## Documentation
### User Documentation
- [ ] Update MCP tool configuration guide with argument examples
- [ ] Add argument specification to CLI tool help text
- [ ] Create examples of common MCP tool argument patterns
### Developer Documentation
- [ ] Update McpToolImpl class documentation
- [ ] Add inline comments for argument parsing logic
- [ ] Document argument flow in system architecture
## Open Questions
1. **Argument validation**: Should we validate argument types/formats beyond basic structure checking?
2. **Dynamic discovery**: Future enhancement to query MCP servers for tool schemas automatically?
## Alternatives Considered
1. **Dynamic MCP schema discovery**: Query MCP servers for tool argument schemas at runtime - rejected due to complexity and reliability concerns
2. **Separate argument registry**: Store MCP tool arguments in separate configuration section - rejected for consistency with prompt template approach
3. **Type validation**: Full JSON schema validation for arguments - deferred as future enhancement to keep initial implementation simple
## References
- [MCP Protocol Specification](https://github.com/modelcontextprotocol/spec)
- [Prompt Template Tool Implementation](./trustgraph-flow/trustgraph/agent/react/service.py#L114-129)
- [Current MCP Tool Implementation](./trustgraph-flow/trustgraph/agent/react/tools.py#L58-86)
## Appendix
[Any additional information, diagrams, or examples]

View file

@ -0,0 +1,279 @@
# More Configuration CLI Technical Specification
## Overview
This specification describes enhanced command-line configuration capabilities for TrustGraph, enabling users to manage individual configuration items through granular CLI commands. The integration supports four primary use cases:
1. **List Configuration Items**: Display configuration keys of a specific type
2. **Get Configuration Item**: Retrieve specific configuration values
3. **Put Configuration Item**: Set or update individual configuration items
4. **Delete Configuration Item**: Remove specific configuration items
## Goals
- **Granular Control**: Enable management of individual configuration items rather than bulk operations
- **Type-Based Listing**: Allow users to explore configuration items by type
- **Single Item Operations**: Provide commands for get/put/delete of individual config items
- **API Integration**: Leverage existing Config API for all operations
- **Consistent CLI Pattern**: Follow established TrustGraph CLI conventions and patterns
- **Error Handling**: Provide clear error messages for invalid operations
- **JSON Output**: Support structured output for programmatic use
- **Documentation**: Include comprehensive help and usage examples
## Background
TrustGraph currently provides configuration management through the Config API and a single CLI command `tg-show-config` that displays the entire configuration. While this works for viewing configuration, it lacks granular management capabilities.
Current limitations include:
- No way to list configuration items by type from CLI
- No CLI command to retrieve specific configuration values
- No CLI command to set individual configuration items
- No CLI command to delete specific configuration items
This specification addresses these gaps by adding four new CLI commands that provide granular configuration management. By exposing individual Config API operations through CLI commands, TrustGraph can:
- Enable scripted configuration management
- Allow exploration of configuration structure by type
- Support targeted configuration updates
- Provide fine-grained configuration control
## Technical Design
### Architecture
The enhanced CLI configuration requires the following technical components:
1. **tg-list-config-items**
- Lists configuration keys for a specified type
- Calls Config.list(type) API method
- Outputs list of configuration keys
Module: `trustgraph.cli.list_config_items`
2. **tg-get-config-item**
- Retrieves specific configuration item(s)
- Calls Config.get(keys) API method
- Outputs configuration values in JSON format
Module: `trustgraph.cli.get_config_item`
3. **tg-put-config-item**
- Sets or updates a configuration item
- Calls Config.put(values) API method
- Accepts type, key, and value parameters
Module: `trustgraph.cli.put_config_item`
4. **tg-delete-config-item**
- Removes a configuration item
- Calls Config.delete(keys) API method
- Accepts type and key parameters
Module: `trustgraph.cli.delete_config_item`
### Data Models
#### ConfigKey and ConfigValue
The commands utilize existing data structures from `trustgraph.api.types`:
```python
@dataclasses.dataclass
class ConfigKey:
type : str
key : str
@dataclasses.dataclass
class ConfigValue:
type : str
key : str
value : str
```
This approach allows:
- Consistent data handling across CLI and API
- Type-safe configuration operations
- Structured input/output formats
- Integration with existing Config API
### CLI Command Specifications
#### tg-list-config-items
```bash
tg-list-config-items --type <config-type> [--format text|json] [--api-url <url>]
```
- **Purpose**: List all configuration keys for a given type
- **API Call**: `Config.list(type)`
- **Output**:
- `text` (default): Configuration keys separated by newlines
- `json`: JSON array of configuration keys
#### tg-get-config-item
```bash
tg-get-config-item --type <type> --key <key> [--format text|json] [--api-url <url>]
```
- **Purpose**: Retrieve specific configuration item
- **API Call**: `Config.get([ConfigKey(type, key)])`
- **Output**:
- `text` (default): Raw string value
- `json`: JSON-encoded string value
#### tg-put-config-item
```bash
tg-put-config-item --type <type> --key <key> --value <value> [--api-url <url>]
tg-put-config-item --type <type> --key <key> --stdin [--api-url <url>]
```
- **Purpose**: Set or update configuration item
- **API Call**: `Config.put([ConfigValue(type, key, value)])`
- **Input Options**:
- `--value`: String value provided directly on command line
- `--stdin`: Read value from standard input
- **Output**: Success confirmation
#### tg-delete-config-item
```bash
tg-delete-config-item --type <type> --key <key> [--api-url <url>]
```
- **Purpose**: Delete configuration item
- **API Call**: `Config.delete([ConfigKey(type, key)])`
- **Output**: Success confirmation
### Implementation Details
All commands follow the established TrustGraph CLI pattern:
- Use `argparse` for command-line argument parsing
- Import and use `trustgraph.api.Api` for backend communication
- Follow the same error handling patterns as existing CLI commands
- Support the standard `--api-url` parameter for API endpoint configuration
- Provide descriptive help text and usage examples
#### Output Format Handling
**Text Format (Default)**:
- `tg-list-config-items`: One key per line, plain text
- `tg-get-config-item`: Raw string value, no quotes or encoding
**JSON Format**:
- `tg-list-config-items`: Array of strings `["key1", "key2", "key3"]`
- `tg-get-config-item`: JSON-encoded string value `"actual string value"`
#### Input Handling
**tg-put-config-item** supports two mutually exclusive input methods:
- `--value <string>`: Direct command-line string value
- `--stdin`: Read entire input from standard input as the configuration value
- stdin contents are read as raw text (preserving newlines, whitespace, etc.)
- Supports piping from files, commands, or interactive input
## Security Considerations
- **Input Validation**: All command-line parameters must be validated before API calls
- **API Authentication**: Commands inherit existing API authentication mechanisms
- **Configuration Access**: Commands respect existing configuration access controls
- **Error Information**: Error messages should not leak sensitive configuration details
## Performance Considerations
- **Single Item Operations**: Commands are designed for individual items, avoiding bulk operation overhead
- **API Efficiency**: Direct API calls minimize processing layers
- **Network Latency**: Each command makes one API call, minimizing network round trips
- **Memory Usage**: Minimal memory footprint for single-item operations
## Testing Strategy
- **Unit Tests**: Test each CLI command module independently
- **Integration Tests**: Test CLI commands against live Config API
- **Error Handling Tests**: Verify proper error handling for invalid inputs
- **API Compatibility**: Ensure commands work with existing Config API versions
## Migration Plan
No migration required - these are new CLI commands that complement existing functionality:
- Existing `tg-show-config` command remains unchanged
- New commands can be added incrementally
- No breaking changes to existing configuration workflows
## Packaging and Distribution
These commands will be added to the existing `trustgraph-cli` package:
**Package Location**: `trustgraph-cli/`
**Module Files**:
- `trustgraph-cli/trustgraph/cli/list_config_items.py`
- `trustgraph-cli/trustgraph/cli/get_config_item.py`
- `trustgraph-cli/trustgraph/cli/put_config_item.py`
- `trustgraph-cli/trustgraph/cli/delete_config_item.py`
**Entry Points**: Added to `trustgraph-cli/pyproject.toml` in `[project.scripts]` section:
```toml
tg-list-config-items = "trustgraph.cli.list_config_items:main"
tg-get-config-item = "trustgraph.cli.get_config_item:main"
tg-put-config-item = "trustgraph.cli.put_config_item:main"
tg-delete-config-item = "trustgraph.cli.delete_config_item:main"
```
## Implementation Tasks
1. **Create CLI Modules**: Implement the four CLI command modules in `trustgraph-cli/trustgraph/cli/`
2. **Update pyproject.toml**: Add new command entry points to `trustgraph-cli/pyproject.toml`
3. **Documentation**: Create CLI documentation for each command in `docs/cli/`
4. **Testing**: Implement comprehensive test coverage
5. **Integration**: Ensure commands work with existing TrustGraph infrastructure
6. **Package Build**: Verify commands are properly installed with `pip install trustgraph-cli`
## Usage Examples
#### List configuration items
```bash
# List prompt keys (text format)
tg-list-config-items --type prompt
template-1
template-2
system-prompt
# List prompt keys (JSON format)
tg-list-config-items --type prompt --format json
["template-1", "template-2", "system-prompt"]
```
#### Get configuration item
```bash
# Get prompt value (text format)
tg-get-config-item --type prompt --key template-1
You are a helpful assistant. Please respond to: {query}
# Get prompt value (JSON format)
tg-get-config-item --type prompt --key template-1 --format json
"You are a helpful assistant. Please respond to: {query}"
```
#### Set configuration item
```bash
# Set from command line
tg-put-config-item --type prompt --key new-template --value "Custom prompt: {input}"
# Set from file via pipe
cat ./prompt-template.txt | tg-put-config-item --type prompt --key complex-template --stdin
# Set from file via redirect
tg-put-config-item --type prompt --key complex-template --stdin < ./prompt-template.txt
# Set from command output
echo "Generated template: {query}" | tg-put-config-item --type prompt --key auto-template --stdin
```
#### Delete configuration item
```bash
tg-delete-config-item --type prompt --key old-template
```
## Open Questions
- Should commands support batch operations (multiple keys) in addition to single items?
- What output format should be used for success confirmations?
- How should configuration types be documented/discovered by users?
## References
- Existing Config API: `trustgraph/api/config.py`
- CLI patterns: `trustgraph-cli/trustgraph/cli/show_config.py`
- Data types: `trustgraph/api/types.py`

View file

@ -0,0 +1,91 @@
# Schema Directory Refactoring Proposal
## Current Issues
1. **Flat structure** - All schemas in one directory makes it hard to understand relationships
2. **Mixed concerns** - Core types, domain objects, and API contracts all mixed together
3. **Unclear naming** - Files like "object.py", "types.py", "topic.py" don't clearly indicate their purpose
4. **No clear layering** - Can't easily see what depends on what
## Proposed Structure
```
trustgraph-base/trustgraph/schema/
├── __init__.py
├── core/ # Core primitive types used everywhere
│ ├── __init__.py
│ ├── primitives.py # Error, Value, Triple, Field, RowSchema
│ ├── metadata.py # Metadata record
│ └── topic.py # Topic utilities
├── knowledge/ # Knowledge domain models and extraction
│ ├── __init__.py
│ ├── graph.py # EntityContext, EntityEmbeddings, Triples
│ ├── document.py # Document, TextDocument, Chunk
│ ├── knowledge.py # Knowledge extraction types
│ ├── embeddings.py # All embedding-related types (moved from multiple files)
│ └── nlp.py # Definition, Topic, Relationship, Fact types
└── services/ # Service request/response contracts
├── __init__.py
├── llm.py # TextCompletion, Embeddings, Tool requests/responses
├── retrieval.py # GraphRAG, DocumentRAG queries/responses
├── query.py # GraphEmbeddingsRequest/Response, DocumentEmbeddingsRequest/Response
├── agent.py # Agent requests/responses
├── flow.py # Flow requests/responses
├── prompt.py # Prompt service requests/responses
├── config.py # Configuration service
├── library.py # Librarian service
└── lookup.py # Lookup service
```
## Key Changes
1. **Hierarchical organization** - Clear separation between core types, knowledge models, and service contracts
2. **Better naming**:
- `types.py``core/primitives.py` (clearer purpose)
- `object.py` → Split between appropriate files based on actual content
- `documents.py``knowledge/document.py` (singular, consistent)
- `models.py``services/llm.py` (clearer what kind of models)
- `prompt.py` → Split: service parts to `services/prompt.py`, data types to `knowledge/nlp.py`
3. **Logical grouping**:
- All embedding types consolidated in `knowledge/embeddings.py`
- All LLM-related service contracts in `services/llm.py`
- Clear separation of request/response pairs in services directory
- Knowledge extraction types grouped with other knowledge domain models
4. **Dependency clarity**:
- Core types have no dependencies
- Knowledge models depend only on core
- Service contracts can depend on both core and knowledge models
## Migration Benefits
1. **Easier navigation** - Developers can quickly find what they need
2. **Better modularity** - Clear boundaries between different concerns
3. **Simpler imports** - More intuitive import paths
4. **Future-proof** - Easy to add new knowledge types or services without cluttering
## Example Import Changes
```python
# Before
from trustgraph.schema import Error, Triple, GraphEmbeddings, TextCompletionRequest
# After
from trustgraph.schema.core import Error, Triple
from trustgraph.schema.knowledge import GraphEmbeddings
from trustgraph.schema.services import TextCompletionRequest
```
## Implementation Notes
1. Keep backward compatibility by maintaining imports in root `__init__.py`
2. Move files gradually, updating imports as needed
3. Consider adding a `legacy.py` that imports everything for transition period
4. Update documentation to reflect new structure
<function_calls>
<invoke name="TodoWrite">
<parameter name="todos">[{"id": "1", "content": "Examine current schema directory structure", "status": "completed", "priority": "high"}, {"id": "2", "content": "Analyze schema files and their purposes", "status": "completed", "priority": "high"}, {"id": "3", "content": "Propose improved naming and structure", "status": "completed", "priority": "high"}]

View file

@ -0,0 +1,139 @@
# Structured Data Pulsar Schema Changes
## Overview
Based on the STRUCTURED_DATA.md specification, this document proposes the necessary Pulsar schema additions and modifications to support structured data capabilities in TrustGraph.
## Required Schema Changes
### 1. Core Schema Enhancements
#### Enhanced Field Definition
The existing `Field` class in `core/primitives.py` needs additional properties:
```python
class Field(Record):
name = String()
type = String() # int, string, long, bool, float, double, timestamp
size = Integer()
primary = Boolean()
description = String()
# NEW FIELDS:
required = Boolean() # Whether field is required
enum_values = Array(String()) # For enum type fields
indexed = Boolean() # Whether field should be indexed
```
### 2. New Knowledge Schemas
#### 2.1 Structured Data Submission
New file: `knowledge/structured.py`
```python
from pulsar.schema import Record, String, Bytes, Map
from ..core.metadata import Metadata
class StructuredDataSubmission(Record):
metadata = Metadata()
format = String() # "json", "csv", "xml"
schema_name = String() # Reference to schema in config
data = Bytes() # Raw data to ingest
options = Map(String()) # Format-specific options
```
### 3. New Service Schemas
#### 3.1 NLP to Structured Query Service
New file: `services/nlp_query.py`
```python
from pulsar.schema import Record, String, Array, Map, Integer, Double
from ..core.primitives import Error
class NLPToStructuredQueryRequest(Record):
natural_language_query = String()
max_results = Integer()
context_hints = Map(String()) # Optional context for query generation
class NLPToStructuredQueryResponse(Record):
error = Error()
graphql_query = String() # Generated GraphQL query
variables = Map(String()) # GraphQL variables if any
detected_schemas = Array(String()) # Which schemas the query targets
confidence = Double()
```
#### 3.2 Structured Query Service
New file: `services/structured_query.py`
```python
from pulsar.schema import Record, String, Map, Array
from ..core.primitives import Error
class StructuredQueryRequest(Record):
query = String() # GraphQL query
variables = Map(String()) # GraphQL variables
operation_name = String() # Optional operation name for multi-operation documents
class StructuredQueryResponse(Record):
error = Error()
data = String() # JSON-encoded GraphQL response data
errors = Array(String()) # GraphQL errors if any
```
#### 2.2 Object Extraction Output
New file: `knowledge/object.py`
```python
from pulsar.schema import Record, String, Map, Double
from ..core.metadata import Metadata
class ExtractedObject(Record):
metadata = Metadata()
schema_name = String() # Which schema this object belongs to
values = Map(String()) # Field name -> value
confidence = Double()
source_span = String() # Text span where object was found
```
### 4. Enhanced Knowledge Schemas
#### 4.1 Object Embeddings Enhancement
Update `knowledge/embeddings.py` to support structured object embeddings better:
```python
class StructuredObjectEmbedding(Record):
metadata = Metadata()
vectors = Array(Array(Double()))
schema_name = String()
object_id = String() # Primary key value
field_embeddings = Map(Array(Double())) # Per-field embeddings
```
## Integration Points
### Flow Integration
The schemas will be used by new flow modules:
- `trustgraph-flow/trustgraph/decoding/structured` - Uses StructuredDataSubmission
- `trustgraph-flow/trustgraph/query/nlp_query/cassandra` - Uses NLP query schemas
- `trustgraph-flow/trustgraph/query/objects/cassandra` - Uses structured query schemas
- `trustgraph-flow/trustgraph/extract/object/row/` - Consumes Chunk, produces ExtractedObject
- `trustgraph-flow/trustgraph/storage/objects/cassandra` - Uses Rows schema
- `trustgraph-flow/trustgraph/embeddings/object_embeddings/qdrant` - Uses object embedding schemas
## Implementation Notes
1. **Schema Versioning**: Consider adding a `version` field to RowSchema for future migration support
2. **Type System**: The `Field.type` should support all Cassandra native types
3. **Batch Operations**: Most services should support both single and batch operations
4. **Error Handling**: Consistent error reporting across all new services
5. **Backwards Compatibility**: Existing schemas remain unchanged except for minor Field enhancements
## Next Steps
1. Implement schema files in the new structure
2. Update existing services to recognize new schema types
3. Implement flow modules that use these schemas
4. Add gateway/rev-gateway endpoints for new services
5. Create unit tests for schema validation

View file

@ -0,0 +1,253 @@
# Structured Data Technical Specification
## Overview
This specification describes the integration of TrustGraph with structured data flows, enabling the system to work with data that can be represented as rows in tables or objects in object stores. The integration supports four primary use cases:
1. **Unstructured to Structured Extraction**: Read unstructured data sources, identify and extract object structures, and store them in a tabular format
2. **Structured Data Ingestion**: Load data that is already in structured formats directly into the structured store alongside extracted data
3. **Natural Language Querying**: Convert natural language questions into structured queries to extract matching data from the store
4. **Direct Structured Querying**: Execute structured queries directly against the data store for precise data retrieval
## Goals
- **Unified Data Access**: Provide a single interface for accessing both structured and unstructured data within TrustGraph
- **Seamless Integration**: Enable smooth interoperability between TrustGraph's graph-based knowledge representation and traditional structured data formats
- **Flexible Extraction**: Support automatic extraction of structured data from various unstructured sources (documents, text, etc.)
- **Query Versatility**: Allow users to query data using both natural language and structured query languages
- **Data Consistency**: Maintain data integrity and consistency across different data representations
- **Performance Optimization**: Ensure efficient storage and retrieval of structured data at scale
- **Schema Flexibility**: Support both schema-on-write and schema-on-read approaches to accommodate diverse data sources
- **Backwards Compatibility**: Preserve existing TrustGraph functionality while adding structured data capabilities
## Background
TrustGraph currently excels at processing unstructured data and building knowledge graphs from diverse sources. However, many enterprise use cases involve data that is inherently structured - customer records, transaction logs, inventory databases, and other tabular datasets. These structured datasets often need to be analyzed alongside unstructured content to provide comprehensive insights.
Current limitations include:
- No native support for ingesting pre-structured data formats (CSV, JSON arrays, database exports)
- Inability to preserve the inherent structure when extracting tabular data from documents
- Lack of efficient querying mechanisms for structured data patterns
- Missing bridge between SQL-like queries and TrustGraph's graph queries
This specification addresses these gaps by introducing a structured data layer that complements TrustGraph's existing capabilities. By supporting structured data natively, TrustGraph can:
- Serve as a unified platform for both structured and unstructured data analysis
- Enable hybrid queries that span both graph relationships and tabular data
- Provide familiar interfaces for users accustomed to working with structured data
- Unlock new use cases in data integration and business intelligence
## Technical Design
### Architecture
The structured data integration requires the following technical components:
1. **NLP-to-Structured-Query Service**
- Converts natural language questions into structured queries
- Supports multiple query language targets (initially SQL-like syntax)
- Integrates with existing TrustGraph NLP capabilities
Module: trustgraph-flow/trustgraph/query/nlp_query/cassandra
2. **Configuration Schema Support****[COMPLETE]**
- Extended configuration system to store structured data schemas
- Support for defining table structures, field types, and relationships
- Schema versioning and migration capabilities
3. **Object Extraction Module****[COMPLETE]**
- Enhanced knowledge extractor flow integration
- Identifies and extracts structured objects from unstructured sources
- Maintains provenance and confidence scores
- Registers a config handler (example: trustgraph-flow/trustgraph/prompt/template/service.py) to receive config data and decode schema information
- Receives objects and decodes them to ExtractedObject objects for delivery on the Pulsar queue
- NOTE: There's existing code at `trustgraph-flow/trustgraph/extract/object/row/`. This was a previous attempt and will need to be majorly refactored as it doesn't conform to current APIs. Use it if it's useful, start from scratch if not.
- Requires a command-line interface: `kg-extract-objects`
Module: trustgraph-flow/trustgraph/extract/kg/objects/
4. **Structured Store Writer Module****[COMPLETE]**
- Receives objects in ExtractedObject format from Pulsar queues
- Initial implementation targeting Apache Cassandra as the structured data store
- Handles dynamic table creation based on schemas encountered
- Manages schema-to-Cassandra table mapping and data transformation
- Provides batch and streaming write operations for performance optimization
- No Pulsar outputs - this is a terminal service in the data flow
**Schema Handling**:
- Monitors incoming ExtractedObject messages for schema references
- When a new schema is encountered for the first time, automatically creates the corresponding Cassandra table
- Maintains a cache of known schemas to avoid redundant table creation attempts
- Should consider whether to receive schema definitions directly or rely on schema names in ExtractedObject messages
**Cassandra Table Mapping**:
- Keyspace is named after the `user` field from ExtractedObject's Metadata
- Table is named after the `schema_name` field from ExtractedObject
- Collection from Metadata becomes part of the partition key to ensure:
- Natural data distribution across Cassandra nodes
- Efficient queries within a specific collection
- Logical isolation between different data imports/sources
- Primary key structure: `PRIMARY KEY ((collection, <schema_primary_key_fields>), <clustering_keys>)`
- Collection is always the first component of the partition key
- Schema-defined primary key fields follow as part of the composite partition key
- This requires queries to specify the collection, ensuring predictable performance
- Field definitions map to Cassandra columns with type conversions:
- `string``text`
- `integer``int` or `bigint` based on size hint
- `float``float` or `double` based on precision needs
- `boolean``boolean`
- `timestamp``timestamp`
- `enum``text` with application-level validation
- Indexed fields create Cassandra secondary indexes (excluding fields already in the primary key)
- Required fields are enforced at the application level (Cassandra doesn't support NOT NULL)
**Object Storage**:
- Extracts values from ExtractedObject.values map
- Performs type conversion and validation before insertion
- Handles missing optional fields gracefully
- Maintains metadata about object provenance (source document, confidence scores)
- Supports idempotent writes to handle message replay scenarios
**Implementation Notes**:
- Existing code at `trustgraph-flow/trustgraph/storage/objects/cassandra/` is outdated and doesn't comply with current APIs
- Should reference `trustgraph-flow/trustgraph/storage/triples/cassandra` as an example of a working storage processor
- Needs evaluation of existing code for any reusable components before deciding to refactor or rewrite
Module: trustgraph-flow/trustgraph/storage/objects/cassandra
5. **Structured Query Service**
- Accepts structured queries in defined formats
- Executes queries against the structured store
- Returns objects matching query criteria
- Supports pagination and result filtering
Module: trustgraph-flow/trustgraph/query/objects/cassandra
6. **Agent Tool Integration**
- New tool class for agent frameworks
- Enables agents to query structured data stores
- Provides natural language and structured query interfaces
- Integrates with existing agent decision-making processes
7. **Structured Data Ingestion Service**
- Accepts structured data in multiple formats (JSON, CSV, XML)
- Parses and validates incoming data against defined schemas
- Converts data into normalized object streams
- Emits objects to appropriate message queues for processing
- Supports bulk uploads and streaming ingestion
Module: trustgraph-flow/trustgraph/decoding/structured
8. **Object Embedding Service**
- Generates vector embeddings for structured objects
- Enables semantic search across structured data
- Supports hybrid search combining structured queries with semantic similarity
- Integrates with existing vector stores
Module: trustgraph-flow/trustgraph/embeddings/object_embeddings/qdrant
### Data Models
#### Schema Storage Mechanism
Schemas are stored in TrustGraph's configuration system using the following structure:
- **Type**: `schema` (fixed value for all structured data schemas)
- **Key**: The unique name/identifier of the schema (e.g., `customer_records`, `transaction_log`)
- **Value**: JSON schema definition containing the structure
Example configuration entry:
```
Type: schema
Key: customer_records
Value: {
"name": "customer_records",
"description": "Customer information table",
"fields": [
{
"name": "customer_id",
"type": "string",
"primary_key": true
},
{
"name": "name",
"type": "string",
"required": true
},
{
"name": "email",
"type": "string",
"required": true
},
{
"name": "registration_date",
"type": "timestamp"
},
{
"name": "status",
"type": "string",
"enum": ["active", "inactive", "suspended"]
}
],
"indexes": ["email", "registration_date"]
}
```
This approach allows:
- Dynamic schema definition without code changes
- Easy schema updates and versioning
- Consistent integration with existing TrustGraph configuration management
- Support for multiple schemas within a single deployment
### APIs
New APIs:
- Pulsar schemas for above types
- Pulsar interfaces in new flows
- Need a means to specify schema types in flows so that flows know which
schema types to load
- APIs added to gateway and rev-gateway
Modified APIs:
- Knowledge extraction endpoints - Add structured object output option
- Agent endpoints - Add structured data tool support
### Implementation Details
Following existing conventions - these are just new processing modules.
Everything is in the trustgraph-flow packages except for schema items
in trustgraph-base.
Need some UI work in the Workbench to be able to demo / pilot this
capability.
## Security Considerations
No extra considerations.
## Performance Considerations
Some questions around using Cassandra queries and indexes so that queries
don't slow down.
## Testing Strategy
Use existing test strategy, will build unit, contract and integration tests.
## Migration Plan
None.
## Timeline
Not specified.
## Open Questions
- Can this be made to work with other store types? We're aiming to use
interfaces which make modules which work with one store applicable to
other stores.
## References
n/a.

View file

@ -187,7 +187,7 @@ Final Answer: Machine learning is a field of AI that enables computers to learn
# Verify tool was executed
graph_rag_client = mock_flow_context("graph-rag-request")
graph_rag_client.rag.assert_called_once_with("What is machine learning?")
graph_rag_client.rag.assert_called_once_with("What is machine learning?", collection="default")
@pytest.mark.asyncio
async def test_agent_manager_react_with_final_answer(self, agent_manager, mock_flow_context):
@ -272,7 +272,7 @@ Args: {{
# Verify correct service was called
if tool_name == "knowledge_query":
mock_flow_context("graph-rag-request").rag.assert_called()
mock_flow_context("graph-rag-request").rag.assert_called_with("test question", collection="default")
elif tool_name == "text_completion":
mock_flow_context("prompt-request").question.assert_called()
@ -713,4 +713,127 @@ Final Answer: {
# Should not raise JSON serialization errors
json_str = json.dumps(variables, indent=4)
assert len(json_str) > 0
assert len(json_str) > 0
@pytest.mark.asyncio
async def test_knowledge_query_with_default_collection(self, mock_flow_context):
"""Test KnowledgeQueryImpl uses default collection when not specified"""
# Arrange
tool = KnowledgeQueryImpl(mock_flow_context)
# Act
result = await tool.invoke(question="What is AI?")
# Assert
graph_rag_client = mock_flow_context("graph-rag-request")
graph_rag_client.rag.assert_called_once_with("What is AI?", collection="default")
@pytest.mark.asyncio
async def test_knowledge_query_with_custom_collection(self, mock_flow_context):
"""Test KnowledgeQueryImpl uses custom collection when specified"""
# Arrange
tool = KnowledgeQueryImpl(mock_flow_context, collection="custom_collection")
# Act
result = await tool.invoke(question="What is machine learning?")
# Assert
graph_rag_client = mock_flow_context("graph-rag-request")
graph_rag_client.rag.assert_called_once_with("What is machine learning?", collection="custom_collection")
@pytest.mark.asyncio
async def test_knowledge_query_with_none_collection(self, mock_flow_context):
"""Test KnowledgeQueryImpl handles None collection properly"""
# Arrange
tool = KnowledgeQueryImpl(mock_flow_context, collection=None)
# Act
result = await tool.invoke(question="Explain neural networks")
# Assert
graph_rag_client = mock_flow_context("graph-rag-request")
graph_rag_client.rag.assert_called_once_with("Explain neural networks", collection="default")
@pytest.mark.asyncio
async def test_agent_manager_knowledge_query_collection_integration(self, mock_flow_context):
"""Test agent manager integration with KnowledgeQueryImpl collection parameter"""
# Arrange
custom_tools = {
"knowledge_query_custom": Tool(
name="knowledge_query_custom",
description="Query custom knowledge collection",
arguments=[
Argument(
name="question",
type="string",
description="The question to ask"
)
],
implementation=KnowledgeQueryImpl,
config={"collection": "research_papers"}
),
"knowledge_query_default": Tool(
name="knowledge_query_default",
description="Query default knowledge collection",
arguments=[
Argument(
name="question",
type="string",
description="The question to ask"
)
],
implementation=KnowledgeQueryImpl,
config={}
)
}
agent = AgentManager(tools=custom_tools, additional_context="")
# Mock response for custom collection query
mock_flow_context("prompt-request").agent_react.return_value = """Thought: I need to search in the research papers
Action: knowledge_query_custom
Args: {
"question": "Latest AI research?"
}"""
think_callback = AsyncMock()
observe_callback = AsyncMock()
# Act
action = await agent.react("Find latest research", [], think_callback, observe_callback, mock_flow_context)
# Assert
assert isinstance(action, Action)
assert action.name == "knowledge_query_custom"
# Verify the custom collection was used
graph_rag_client = mock_flow_context("graph-rag-request")
graph_rag_client.rag.assert_called_once_with("Latest AI research?", collection="research_papers")
@pytest.mark.asyncio
async def test_knowledge_query_multiple_collections(self, mock_flow_context):
"""Test multiple KnowledgeQueryImpl instances with different collections"""
# Arrange
tools = {
"general_kb": KnowledgeQueryImpl(mock_flow_context, collection="general"),
"technical_kb": KnowledgeQueryImpl(mock_flow_context, collection="technical"),
"research_kb": KnowledgeQueryImpl(mock_flow_context, collection="research")
}
# Act & Assert for each tool
test_cases = [
("general_kb", "What is Python?", "general"),
("technical_kb", "Explain TCP/IP", "technical"),
("research_kb", "Latest ML papers", "research")
]
for tool_name, question, expected_collection in test_cases:
# Reset mock
mock_flow_context("graph-rag-request").reset_mock()
# Invoke tool
await tools[tool_name].invoke(question=question)
# Verify correct collection was used
graph_rag_client = mock_flow_context("graph-rag-request")
graph_rag_client.rag.assert_called_once_with(question, collection=expected_collection)

View file

@ -0,0 +1,336 @@
"""
Integration tests for CLI configuration commands.
Tests the full command execution flow with mocked API responses
to verify end-to-end functionality.
"""
import pytest
import json
import sys
from unittest.mock import patch, Mock, MagicMock
from io import StringIO
# Import the CLI modules directly for integration testing
from trustgraph.cli.list_config_items import main as list_main
from trustgraph.cli.get_config_item import main as get_main
from trustgraph.cli.put_config_item import main as put_main
from trustgraph.cli.delete_config_item import main as delete_main
class TestConfigCLIIntegration:
"""Test CLI commands with mocked API responses."""
@patch('trustgraph.cli.list_config_items.Api')
def test_list_config_items_integration(self, mock_api_class, capsys):
"""Test tg-list-config-items with mocked API response."""
# Mock the API and config objects
mock_api = MagicMock()
mock_config = MagicMock()
mock_api.config.return_value = mock_config
mock_api_class.return_value = mock_api
# Mock the list response
mock_config.list.return_value = ["template-1", "template-2", "system-prompt"]
# Run the command with test args
test_args = [
'tg-list-config-items',
'--type', 'prompt',
'--format', 'json'
]
with patch('sys.argv', test_args):
list_main()
captured = capsys.readouterr()
output = json.loads(captured.out.strip())
assert output == ["template-1", "template-2", "system-prompt"]
@patch('trustgraph.cli.get_config_item.Api')
def test_get_config_item_integration(self, mock_api_class, capsys):
"""Test tg-get-config-item with mocked API response."""
from trustgraph.api.types import ConfigValue
# Mock the API and config objects
mock_api = MagicMock()
mock_config = MagicMock()
mock_api.config.return_value = mock_config
mock_api_class.return_value = mock_api
# Mock the get response
mock_config_value = ConfigValue(
type="prompt",
key="template-1",
value="You are a helpful assistant. Please respond to: {query}"
)
mock_config.get.return_value = [mock_config_value]
# Run the command with test args
test_args = [
'tg-get-config-item',
'--type', 'prompt',
'--key', 'template-1',
'--format', 'text'
]
with patch('sys.argv', test_args):
get_main()
captured = capsys.readouterr()
assert captured.out.strip() == "You are a helpful assistant. Please respond to: {query}"
@patch('trustgraph.cli.put_config_item.Api')
def test_put_config_item_integration(self, mock_api_class, capsys):
"""Test tg-put-config-item with mocked API response."""
# Mock the API and config objects
mock_api = MagicMock()
mock_config = MagicMock()
mock_api.config.return_value = mock_config
mock_api_class.return_value = mock_api
# Run the command with test args
test_args = [
'tg-put-config-item',
'--type', 'prompt',
'--key', 'new-template',
'--value', 'Custom prompt: {input}'
]
with patch('sys.argv', test_args):
put_main()
captured = capsys.readouterr()
assert "Configuration item set: prompt/new-template" in captured.out
@patch('trustgraph.cli.delete_config_item.Api')
def test_delete_config_item_integration(self, mock_api_class, capsys):
"""Test tg-delete-config-item with mocked API response."""
# Mock the API and config objects
mock_api = MagicMock()
mock_config = MagicMock()
mock_api.config.return_value = mock_config
mock_api_class.return_value = mock_api
# Run the command with test args
test_args = [
'tg-delete-config-item',
'--type', 'prompt',
'--key', 'old-template'
]
with patch('sys.argv', test_args):
delete_main()
captured = capsys.readouterr()
assert "Configuration item deleted: prompt/old-template" in captured.out
@patch('trustgraph.cli.put_config_item.Api')
def test_put_config_item_stdin_integration(self, mock_api_class, capsys):
"""Test tg-put-config-item with stdin input."""
# Mock the API and config objects
mock_api = MagicMock()
mock_config = MagicMock()
mock_api.config.return_value = mock_config
mock_api_class.return_value = mock_api
stdin_content = "Multi-line template:\nLine 1\nLine 2"
# Run the command with test args and mocked stdin
test_args = [
'tg-put-config-item',
'--type', 'prompt',
'--key', 'multiline-template',
'--stdin'
]
with patch('sys.argv', test_args), \
patch('sys.stdin', StringIO(stdin_content)):
put_main()
captured = capsys.readouterr()
assert "Configuration item set: prompt/multiline-template" in captured.out
@patch('trustgraph.cli.list_config_items.Api')
def test_api_error_handling_integration(self, mock_api_class, capsys):
"""Test CLI commands handle API errors gracefully."""
# Mock API to raise an exception
mock_api_class.side_effect = Exception("Configuration type not found")
test_args = [
'tg-list-config-items',
'--type', 'nonexistent'
]
with patch('sys.argv', test_args):
list_main()
captured = capsys.readouterr()
assert "Exception:" in captured.out
assert "Configuration type not found" in captured.out
def test_list_help_message(self):
"""Test that help message is displayed correctly."""
test_args = ['tg-list-config-items', '--help']
with patch('sys.argv', test_args):
with pytest.raises(SystemExit) as exc_info:
list_main()
# Help command exits with code 0
assert exc_info.value.code == 0
def test_missing_required_args(self):
"""Test that missing required arguments are handled."""
# Test list without --type
test_args = ['tg-list-config-items']
with patch('sys.argv', test_args):
with pytest.raises(SystemExit) as exc_info:
list_main()
# Missing required args exit with non-zero code
assert exc_info.value.code != 0
# Test get without --key
test_args = ['tg-get-config-item', '--type', 'prompt']
with patch('sys.argv', test_args):
with pytest.raises(SystemExit) as exc_info:
get_main()
assert exc_info.value.code != 0
def test_mutually_exclusive_put_args(self):
"""Test that --value and --stdin are mutually exclusive."""
test_args = [
'tg-put-config-item',
'--type', 'prompt',
'--key', 'test',
'--value', 'test',
'--stdin'
]
with patch('sys.argv', test_args):
with pytest.raises(SystemExit) as exc_info:
put_main()
assert exc_info.value.code != 0
class TestConfigCLIWorkflow:
"""Test complete workflows using multiple commands."""
@patch('trustgraph.cli.put_config_item.Api')
@patch('trustgraph.cli.get_config_item.Api')
def test_put_then_get_workflow(self, mock_get_api, mock_put_api, capsys):
"""Test putting a config item then retrieving it."""
from trustgraph.api.types import ConfigValue
# Mock put API
mock_put_config = MagicMock()
mock_put_api.return_value.config.return_value = mock_put_config
# Mock get API
mock_get_config = MagicMock()
mock_get_api.return_value.config.return_value = mock_get_config
mock_config_value = ConfigValue(
type="prompt",
key="workflow-test",
value="Workflow test value"
)
mock_get_config.get.return_value = [mock_config_value]
# Put config item
put_args = [
'tg-put-config-item',
'--type', 'prompt',
'--key', 'workflow-test',
'--value', 'Workflow test value'
]
with patch('sys.argv', put_args):
put_main()
put_output = capsys.readouterr()
assert "Configuration item set" in put_output.out
# Get config item
get_args = [
'tg-get-config-item',
'--type', 'prompt',
'--key', 'workflow-test'
]
with patch('sys.argv', get_args):
get_main()
get_output = capsys.readouterr()
assert get_output.out.strip() == "Workflow test value"
@patch('trustgraph.cli.list_config_items.Api')
@patch('trustgraph.cli.put_config_item.Api')
@patch('trustgraph.cli.delete_config_item.Api')
def test_list_put_delete_workflow(self, mock_delete_api, mock_put_api, mock_list_api, capsys):
"""Test list, put, then delete workflow."""
# Mock list API (empty initially, then with item)
mock_list_config = MagicMock()
mock_list_api.return_value.config.return_value = mock_list_config
mock_list_config.list.side_effect = [[], ["new-item"]] # Empty first, then has item
# Mock put API
mock_put_config = MagicMock()
mock_put_api.return_value.config.return_value = mock_put_config
# Mock delete API
mock_delete_config = MagicMock()
mock_delete_api.return_value.config.return_value = mock_delete_config
# List (should be empty)
list_args1 = [
'tg-list-config-items',
'--type', 'prompt',
'--format', 'json'
]
with patch('sys.argv', list_args1):
list_main()
list_output1 = capsys.readouterr()
assert json.loads(list_output1.out.strip()) == []
# Put item
put_args = [
'tg-put-config-item',
'--type', 'prompt',
'--key', 'new-item',
'--value', 'New item value'
]
with patch('sys.argv', put_args):
put_main()
put_output = capsys.readouterr()
assert "Configuration item set" in put_output.out
# List (should contain new item)
list_args2 = [
'tg-list-config-items',
'--type', 'prompt',
'--format', 'json'
]
with patch('sys.argv', list_args2):
list_main()
list_output2 = capsys.readouterr()
assert json.loads(list_output2.out.strip()) == ["new-item"]
# Delete item
delete_args = [
'tg-delete-config-item',
'--type', 'prompt',
'--key', 'new-item'
]
with patch('sys.argv', delete_args):
delete_main()
delete_output = capsys.readouterr()
assert "Configuration item deleted" in delete_output.out

View file

@ -305,6 +305,639 @@ Answer: The capital of France is Paris."""
assert reasoning_plan[1]["action"] == "find_population"
assert all("step" in step for step in reasoning_plan)
def test_multi_iteration_react_execution(self):
"""Test complete multi-iteration ReACT cycle with sequential tool invocations
This test simulates a complex query that requires:
1. Tool #1: Search for initial information
2. Tool #2: Analyze/refine based on Tool #1's output
3. Tool #3: Generate final answer using accumulated context
Each iteration includes Think -> Act -> Observe phases with
observations feeding into subsequent thinking phases.
"""
# Arrange
question = "Find the GDP of the capital of Japan and compare it to Tokyo's population"
# Mock tools that build on each other's outputs
tool_invocation_log = []
def mock_geo_search(query):
"""Tool 1: Geographic information search"""
tool_invocation_log.append(("geo_search", query))
if "capital" in query.lower() and "japan" in query.lower():
return {"city": "Tokyo", "country": "Japan", "is_capital": True}
return {"error": "Location not found"}
def mock_economic_data(query, context=None):
"""Tool 2: Economic data retrieval (uses context from Tool 1)"""
tool_invocation_log.append(("economic_data", query, context))
if context and context.get("city") == "Tokyo":
return {"city": "Tokyo", "gdp_trillion_yen": 115.7, "year": 2023}
return {"error": "Economic data not available"}
def mock_demographic_data(query, context=None):
"""Tool 3: Demographic data and comparison (uses context from Tools 1 & 2)"""
tool_invocation_log.append(("demographic_data", query, context))
if context and context.get("city") == "Tokyo":
population_millions = 14.0
gdp_from_context = context.get("gdp_trillion_yen", 0)
return {
"city": "Tokyo",
"population_millions": population_millions,
"gdp_trillion_yen": gdp_from_context,
"gdp_per_capita_million_yen": round(gdp_from_context / population_millions, 2) if population_millions > 0 else 0
}
return {"error": "Demographic data not available"}
# Execute multi-iteration ReACT cycle
def execute_multi_iteration_react(question, tools):
"""Execute a complete multi-iteration ReACT cycle"""
iterations = []
context = {}
# Iteration 1: Initial geographic search
iteration_1 = {
"iteration": 1,
"think": "I need to first identify the capital of Japan to get its GDP",
"act": {"tool": "geo_search", "query": "capital of Japan"},
"observe": None
}
result_1 = tools["geo_search"](iteration_1["act"]["query"])
iteration_1["observe"] = f"Found that {result_1['city']} is the capital of {result_1['country']}"
context.update(result_1)
iterations.append(iteration_1)
# Iteration 2: Get economic data using context from iteration 1
iteration_2 = {
"iteration": 2,
"think": f"Now I know {context['city']} is the capital. I need to get its GDP data",
"act": {"tool": "economic_data", "query": f"GDP of {context['city']}"},
"observe": None
}
result_2 = tools["economic_data"](iteration_2["act"]["query"], context)
iteration_2["observe"] = f"Retrieved GDP data: {result_2['gdp_trillion_yen']} trillion yen for {result_2['year']}"
context.update(result_2)
iterations.append(iteration_2)
# Iteration 3: Get demographic data and compare using accumulated context
iteration_3 = {
"iteration": 3,
"think": f"I have the GDP ({context['gdp_trillion_yen']} trillion yen). Now I need population data to compare",
"act": {"tool": "demographic_data", "query": f"population of {context['city']}"},
"observe": None
}
result_3 = tools["demographic_data"](iteration_3["act"]["query"], context)
iteration_3["observe"] = f"Population is {result_3['population_millions']} million. GDP per capita is {result_3['gdp_per_capita_million_yen']} million yen"
context.update(result_3)
iterations.append(iteration_3)
# Final answer synthesis
final_answer = {
"think": "I now have all the information needed to answer the question",
"answer": f"Tokyo, the capital of Japan, has a GDP of {context['gdp_trillion_yen']} trillion yen and a population of {context['population_millions']} million people, resulting in a GDP per capita of {context['gdp_per_capita_million_yen']} million yen."
}
return {
"iterations": iterations,
"final_answer": final_answer,
"context": context,
"tool_invocations": len(tool_invocation_log)
}
tools = {
"geo_search": mock_geo_search,
"economic_data": mock_economic_data,
"demographic_data": mock_demographic_data
}
# Act
result = execute_multi_iteration_react(question, tools)
# Assert - Verify complete multi-iteration execution
assert len(result["iterations"]) == 3, "Should have exactly 3 iterations"
# Verify each iteration has complete Think-Act-Observe cycle
for i, iteration in enumerate(result["iterations"], 1):
assert iteration["iteration"] == i
assert "think" in iteration and len(iteration["think"]) > 0
assert "act" in iteration and "tool" in iteration["act"]
assert "observe" in iteration and iteration["observe"] is not None
# Verify sequential tool invocations
assert tool_invocation_log[0][0] == "geo_search"
assert tool_invocation_log[1][0] == "economic_data"
assert tool_invocation_log[2][0] == "demographic_data"
# Verify context accumulation across iterations
assert "Tokyo" in tool_invocation_log[1][1], "Iteration 2 should use data from iteration 1"
assert tool_invocation_log[2][2].get("gdp_trillion_yen") == 115.7, "Iteration 3 should have accumulated GDP data"
# Verify observations feed into subsequent thinking
assert "Tokyo" in result["iterations"][1]["think"], "Iteration 2 thinking should reference observation from iteration 1"
assert "115.7" in result["iterations"][2]["think"], "Iteration 3 thinking should reference GDP from iteration 2"
# Verify final answer synthesis
assert "Tokyo" in result["final_answer"]["answer"]
assert "115.7" in result["final_answer"]["answer"]
assert "14.0" in result["final_answer"]["answer"]
assert "8.26" in result["final_answer"]["answer"], "Should include calculated GDP per capita"
# Verify all 3 tools were invoked in sequence
assert result["tool_invocations"] == 3
def test_multi_iteration_with_dynamic_tool_selection(self):
"""Test multi-iteration ReACT with mocked LLM reasoning dynamically selecting tools
This test simulates how an LLM would dynamically choose tools based on:
1. The original question
2. Previous observations
3. Accumulated context
The mocked LLM reasoning adapts its tool selection based on what it has learned
in previous iterations, mimicking real agent behavior.
"""
# Arrange
question = "What are the main exports of the largest city in Brazil by population?"
# Track reasoning and tool selection
reasoning_log = []
tool_invocation_log = []
def mock_llm_reasoning(question, history, available_tools):
"""Mock LLM that reasons about tool selection based on context"""
# Analyze what we know from history
context = {}
for step in history:
if "observation" in step:
# Extract information from observations
obs = step["observation"]
if "São Paulo" in obs:
context["city"] = "São Paulo"
if "largest city" in obs:
context["is_largest"] = True
if "million" in obs and "population" in obs:
context["has_population"] = True
if "exports" in obs:
context["has_exports"] = True
# Decide next action based on what we know
if not context.get("city"):
# Step 1: Need to find the largest city
reasoning = "I need to find the largest city in Brazil by population"
tool = "geo_search"
args = {"query": "largest city Brazil population"}
elif not context.get("has_population"):
# Step 2: Confirm population data
reasoning = f"I found {context['city']}. Now I need to verify it's the largest by checking population"
tool = "demographic_data"
args = {"query": f"population {context['city']} Brazil"}
elif not context.get("has_exports"):
# Step 3: Get export information
reasoning = f"Confirmed {context['city']} is the largest. Now I need export information"
tool = "economic_data"
args = {"query": f"main exports {context['city']} Brazil"}
else:
# Final: Have all information
reasoning = "I have all the information needed to answer"
tool = "final_answer"
args = None
reasoning_log.append({"reasoning": reasoning, "tool": tool, "context": context.copy()})
return reasoning, tool, args
def mock_geo_search(query):
"""Mock geographic search tool"""
tool_invocation_log.append(("geo_search", query))
if "largest city brazil" in query.lower():
return {
"result": "São Paulo is the largest city in Brazil",
"details": {"city": "São Paulo", "country": "Brazil", "rank": 1}
}
return {"error": "No results found"}
def mock_demographic_data(query):
"""Mock demographic data tool"""
tool_invocation_log.append(("demographic_data", query))
if "são paulo" in query.lower():
return {
"result": "São Paulo has a population of 12.4 million in the city proper, 22.8 million in the metro area",
"details": {"city_population": 12.4, "metro_population": 22.8, "unit": "million"}
}
return {"error": "No demographic data found"}
def mock_economic_data(query):
"""Mock economic data tool"""
tool_invocation_log.append(("economic_data", query))
if "são paulo" in query.lower() and "export" in query.lower():
return {
"result": "São Paulo's main exports include aircraft, vehicles, machinery, coffee, and soybeans",
"details": {
"top_exports": ["aircraft", "vehicles", "machinery", "coffee", "soybeans"],
"export_value_billions_usd": 65.2
}
}
return {"error": "No economic data found"}
# Execute multi-iteration ReACT with dynamic tool selection
def execute_dynamic_react(question, tools, llm_reasoner):
"""Execute ReACT with dynamic LLM-based tool selection"""
iterations = []
history = []
available_tools = list(tools.keys())
max_iterations = 4
for i in range(max_iterations):
# LLM reasons about next action
reasoning, tool_name, args = llm_reasoner(question, history, available_tools)
if tool_name == "final_answer":
# Agent has decided it has enough information
final_answer = {
"reasoning": reasoning,
"answer": "São Paulo, Brazil's largest city with 12.4 million people, " +
"has main exports including aircraft, vehicles, machinery, coffee, and soybeans."
}
break
# Execute selected tool
iteration = {
"iteration": i + 1,
"think": reasoning,
"act": {"tool": tool_name, "args": args},
"observe": None
}
# Get tool result
if tool_name in tools:
result = tools[tool_name](args["query"])
iteration["observe"] = result.get("result", "No information found")
else:
iteration["observe"] = f"Tool {tool_name} not available"
iterations.append(iteration)
# Add to history for next iteration
history.append({
"thought": reasoning,
"action": tool_name,
"args": args,
"observation": iteration["observe"]
})
return {
"iterations": iterations,
"final_answer": final_answer if 'final_answer' in locals() else None,
"reasoning_log": reasoning_log,
"tool_invocations": len(tool_invocation_log)
}
tools = {
"geo_search": mock_geo_search,
"demographic_data": mock_demographic_data,
"economic_data": mock_economic_data
}
# Act
result = execute_dynamic_react(question, tools, mock_llm_reasoning)
# Assert - Verify dynamic multi-iteration execution
assert len(result["iterations"]) == 3, "Should have 3 iterations before final answer"
# Verify reasoning adapts based on observations
assert len(reasoning_log) == 4, "Should have 4 reasoning steps (3 tools + final)"
# Verify first iteration searches for largest city
assert reasoning_log[0]["tool"] == "geo_search"
assert "largest city" in reasoning_log[0]["reasoning"].lower()
assert not reasoning_log[0]["context"].get("city")
# Verify second iteration uses city name from first observation
assert reasoning_log[1]["tool"] == "demographic_data"
assert "São Paulo" in reasoning_log[1]["reasoning"]
assert reasoning_log[1]["context"]["city"] == "São Paulo"
# Verify third iteration builds on previous knowledge
assert reasoning_log[2]["tool"] == "economic_data"
assert "export" in reasoning_log[2]["reasoning"].lower()
assert reasoning_log[2]["context"]["has_population"] is True
# Verify final reasoning has all information
assert reasoning_log[3]["tool"] == "final_answer"
assert reasoning_log[3]["context"]["has_exports"] is True
# Verify tool invocation sequence
assert tool_invocation_log[0][0] == "geo_search"
assert tool_invocation_log[1][0] == "demographic_data"
assert tool_invocation_log[2][0] == "economic_data"
# Verify observations influence subsequent tool selection
assert "São Paulo" in result["iterations"][1]["act"]["args"]["query"]
assert "São Paulo" in result["iterations"][2]["act"]["args"]["query"]
# Verify final answer synthesizes all gathered information
assert result["final_answer"] is not None
assert "São Paulo" in result["final_answer"]["answer"]
assert "12.4 million" in result["final_answer"]["answer"]
assert "aircraft" in result["final_answer"]["answer"]
assert "vehicles" in result["final_answer"]["answer"]
def test_action_name_with_quotes_handling(self):
"""Test that action names with quotes are properly stripped
This test verifies the fix for when LLMs output action names wrapped
in quotes, e.g., Action: "get_bank_balance" instead of Action: get_bank_balance
"""
# Arrange
def parse_react_output(text):
"""Parse ReAct format output into structured steps"""
steps = []
lines = text.strip().split('\n')
thought = None
action = None
args = None
for line in lines:
line = line.strip()
if line.startswith('Think:') or line.startswith('Thought:'):
thought = line.split(':', 1)[1].strip()
elif line.startswith('Action:'):
action = line[7:].strip()
# Strip quotes from action name - this is the fix being tested
while action and action[0] == '"':
action = action[1:]
while action and action[-1] == '"':
action = action[:-1]
elif line.startswith('Args:'):
# Simple args parsing for test
args_text = line[5:].strip()
if args_text:
import json
try:
args = json.loads(args_text)
except:
args = {"raw": args_text}
return {
"thought": thought,
"action": action,
"args": args
}
# Test cases with various quote patterns
test_cases = [
# Normal case without quotes
(
'Thought: I need to check the bank balance\nAction: get_bank_balance\nArgs: {"account": "12345"}',
"get_bank_balance"
),
# Single quotes around action name
(
'Thought: I need to check the bank balance\nAction: "get_bank_balance"\nArgs: {"account": "12345"}',
"get_bank_balance"
),
# Multiple quotes (nested)
(
'Thought: I need to check the bank balance\nAction: ""get_bank_balance""\nArgs: {"account": "12345"}',
"get_bank_balance"
),
# Action with underscores and quotes
(
'Thought: I need to search\nAction: "search_knowledge_base"\nArgs: {"query": "test"}',
"search_knowledge_base"
),
# Action with hyphens and quotes
(
'Thought: I need to search\nAction: "search-knowledge-base"\nArgs: {"query": "test"}',
"search-knowledge-base"
),
# Edge case: just quotes (should result in empty string)
(
'Thought: Error case\nAction: ""\nArgs: {}',
""
),
# Mixed quotes at start and end
(
'Thought: Processing\nAction: """complex_tool"""\nArgs: {}',
"complex_tool"
),
]
# Act & Assert
for llm_output, expected_action in test_cases:
result = parse_react_output(llm_output)
assert result["action"] == expected_action, \
f"Failed to parse action correctly from: {llm_output}\nExpected: {expected_action}, Got: {result['action']}"
# Test with actual tool matching
tools = {
"get_bank_balance": {"description": "Get bank balance"},
"search_knowledge_base": {"description": "Search knowledge"},
"complex_tool": {"description": "Complex operations"}
}
# Simulate tool lookup with quoted action names
quoted_actions = [
'"get_bank_balance"',
'""search_knowledge_base""',
'complex_tool', # without quotes
'"complex_tool"'
]
for quoted_action in quoted_actions:
# Strip quotes as the fix does
clean_action = quoted_action
while clean_action and clean_action[0] == '"':
clean_action = clean_action[1:]
while clean_action and clean_action[-1] == '"':
clean_action = clean_action[:-1]
# Verify the cleaned action exists in tools (except empty string case)
if clean_action:
assert clean_action in tools, \
f"Cleaned action '{clean_action}' from '{quoted_action}' should be in tools"
def test_mcp_tool_arguments_support(self):
"""Test that MCP tools can be configured with arguments and expose them correctly
This test verifies the MCP tool arguments feature where:
1. MCP tool configurations can specify arguments
2. Configuration parsing extracts arguments correctly
3. Arguments are structured properly for tool use
"""
# Define a simple Argument class for testing (mimics the real one)
class TestArgument:
def __init__(self, name, type, description):
self.name = name
self.type = type
self.description = description
# Define a mock McpToolImpl that mimics the new functionality
class MockMcpToolImpl:
def __init__(self, context, mcp_tool_id, arguments=None):
self.context = context
self.mcp_tool_id = mcp_tool_id
self.arguments = arguments or []
def get_arguments(self):
return self.arguments
# Test 1: MCP tool with arguments
test_arguments = [
TestArgument(
name="account_id",
type="string",
description="Bank account identifier"
),
TestArgument(
name="date",
type="string",
description="Date for balance query (optional, format: YYYY-MM-DD)"
)
]
context_mock = lambda service_name: None
mcp_tool_with_args = MockMcpToolImpl(
context=context_mock,
mcp_tool_id="get_bank_balance",
arguments=test_arguments
)
returned_args = mcp_tool_with_args.get_arguments()
# Verify arguments are stored and returned correctly
assert len(returned_args) == 2
assert returned_args[0].name == "account_id"
assert returned_args[0].type == "string"
assert returned_args[0].description == "Bank account identifier"
assert returned_args[1].name == "date"
assert returned_args[1].type == "string"
assert "optional" in returned_args[1].description.lower()
# Test 2: MCP tool without arguments (backward compatibility)
mcp_tool_no_args = MockMcpToolImpl(
context=context_mock,
mcp_tool_id="simple_tool"
)
returned_args_empty = mcp_tool_no_args.get_arguments()
assert len(returned_args_empty) == 0
assert returned_args_empty == []
# Test 3: MCP tool with empty arguments list
mcp_tool_empty_args = MockMcpToolImpl(
context=context_mock,
mcp_tool_id="another_tool",
arguments=[]
)
returned_args_explicit_empty = mcp_tool_empty_args.get_arguments()
assert len(returned_args_explicit_empty) == 0
assert returned_args_explicit_empty == []
# Test 4: Configuration parsing simulation
def simulate_config_parsing(config_data):
"""Simulate how service.py parses MCP tool configuration"""
config_args = config_data.get("arguments", [])
arguments = [
TestArgument(
name=arg.get("name"),
type=arg.get("type"),
description=arg.get("description")
)
for arg in config_args
]
return arguments
# Test configuration with arguments
config_with_args = {
"type": "mcp-tool",
"name": "get_bank_balance",
"description": "Get bank account balance",
"mcp-tool": "get_bank_balance",
"arguments": [
{
"name": "account_id",
"type": "string",
"description": "Bank account identifier"
},
{
"name": "date",
"type": "string",
"description": "Date for balance query (optional)"
}
]
}
parsed_args = simulate_config_parsing(config_with_args)
assert len(parsed_args) == 2
assert parsed_args[0].name == "account_id"
assert parsed_args[1].name == "date"
# Test configuration without arguments
config_without_args = {
"type": "mcp-tool",
"name": "simple_tool",
"description": "Simple MCP tool",
"mcp-tool": "simple_tool"
}
parsed_args_empty = simulate_config_parsing(config_without_args)
assert len(parsed_args_empty) == 0
# Test 5: Argument structure validation
def validate_argument_structure(arg):
"""Validate that an argument has required fields"""
required_fields = ['name', 'type', 'description']
return all(hasattr(arg, field) and getattr(arg, field) for field in required_fields)
# Validate all parsed arguments have proper structure
for arg in parsed_args:
assert validate_argument_structure(arg), f"Argument {arg.name} missing required fields"
# Test 6: Prompt template integration simulation
def simulate_prompt_template_rendering(tools):
"""Simulate how agent prompts include tool arguments"""
tool_descriptions = []
for tool in tools:
tool_desc = f"- **{tool.name}**: {tool.description}"
# Add argument details if present
for arg in tool.arguments:
tool_desc += f"\n - Required: `\"{arg.name}\"` ({arg.type}): {arg.description}"
tool_descriptions.append(tool_desc)
return "\n".join(tool_descriptions)
# Create mock tools with our MCP tool
class MockTool:
def __init__(self, name, description, arguments):
self.name = name
self.description = description
self.arguments = arguments
mock_tools = [
MockTool("search", "Search the web", []), # Tool without arguments
MockTool("get_bank_balance", "Get bank account balance", parsed_args) # MCP tool with arguments
]
prompt_section = simulate_prompt_template_rendering(mock_tools)
# Verify the prompt includes MCP tool arguments
assert "get_bank_balance" in prompt_section
assert "account_id" in prompt_section
assert "Bank account identifier" in prompt_section
assert "date" in prompt_section
assert "(string)" in prompt_section
assert "Required:" in prompt_section
# Verify tools without arguments still work
assert "search" in prompt_section
assert "Search the web" in prompt_section
def test_error_handling_in_react_cycle(self):
"""Test error handling during ReAct execution"""
# Arrange
@ -474,4 +1107,4 @@ Answer: The capital of France is Paris."""
assert "error" in result
else:
assert result["tool_name"] == expected_tool
assert result["parameters"] == expected_params
assert result["parameters"] == expected_params

View file

@ -0,0 +1,458 @@
"""
Unit tests for CLI configuration commands.
Tests the business logic of list/get/put/delete config item commands
while mocking the Config API.
"""
import pytest
import json
import sys
from unittest.mock import Mock, patch, MagicMock
from io import StringIO
from trustgraph.cli.list_config_items import list_config_items, main as list_main
from trustgraph.cli.get_config_item import get_config_item, main as get_main
from trustgraph.cli.put_config_item import put_config_item, main as put_main
from trustgraph.cli.delete_config_item import delete_config_item, main as delete_main
from trustgraph.api.types import ConfigKey, ConfigValue
@pytest.fixture
def mock_api():
"""Mock Api instance with config() method."""
mock_api_instance = Mock()
mock_config = Mock()
mock_api_instance.config.return_value = mock_config
return mock_api_instance, mock_config
@pytest.fixture
def sample_config_keys():
"""Sample configuration keys."""
return ["template-1", "template-2", "system-prompt"]
@pytest.fixture
def sample_config_value():
"""Sample configuration value."""
return ConfigValue(
type="prompt",
key="template-1",
value="You are a helpful assistant. Please respond to: {query}"
)
class TestListConfigItems:
"""Test the list_config_items function."""
@patch('trustgraph.cli.list_config_items.Api')
def test_list_config_items_text_format(self, mock_api_class, mock_api, sample_config_keys, capsys):
"""Test listing config items in text format."""
mock_api_class.return_value, mock_config = mock_api
mock_config.list.return_value = sample_config_keys
list_config_items("http://test.com", "prompt", "text")
captured = capsys.readouterr()
output_lines = captured.out.strip().split('\n')
assert len(output_lines) == 3
assert "template-1" in output_lines
assert "template-2" in output_lines
assert "system-prompt" in output_lines
mock_config.list.assert_called_once_with("prompt")
@patch('trustgraph.cli.list_config_items.Api')
def test_list_config_items_json_format(self, mock_api_class, mock_api, sample_config_keys, capsys):
"""Test listing config items in JSON format."""
mock_api_class.return_value, mock_config = mock_api
mock_config.list.return_value = sample_config_keys
list_config_items("http://test.com", "prompt", "json")
captured = capsys.readouterr()
output = json.loads(captured.out.strip())
assert output == sample_config_keys
mock_config.list.assert_called_once_with("prompt")
@patch('trustgraph.cli.list_config_items.Api')
def test_list_config_items_empty_list(self, mock_api_class, mock_api, capsys):
"""Test listing when no config items exist."""
mock_api_class.return_value, mock_config = mock_api
mock_config.list.return_value = []
list_config_items("http://test.com", "nonexistent", "text")
captured = capsys.readouterr()
assert captured.out.strip() == ""
mock_config.list.assert_called_once_with("nonexistent")
def test_list_main_parses_args_correctly(self):
"""Test that list main() parses arguments correctly."""
test_args = [
'tg-list-config-items',
'--type', 'prompt',
'--format', 'json',
'--api-url', 'http://custom.com'
]
with patch('sys.argv', test_args), \
patch('trustgraph.cli.list_config_items.list_config_items') as mock_list:
list_main()
mock_list.assert_called_once_with(
url='http://custom.com',
config_type='prompt',
format_type='json'
)
def test_list_main_uses_defaults(self):
"""Test that list main() uses default values."""
test_args = [
'tg-list-config-items',
'--type', 'prompt'
]
with patch('sys.argv', test_args), \
patch('trustgraph.cli.list_config_items.list_config_items') as mock_list:
list_main()
mock_list.assert_called_once_with(
url='http://localhost:8088/',
config_type='prompt',
format_type='text'
)
class TestGetConfigItem:
"""Test the get_config_item function."""
@patch('trustgraph.cli.get_config_item.Api')
def test_get_config_item_text_format(self, mock_api_class, mock_api, sample_config_value, capsys):
"""Test getting config item in text format."""
mock_api_class.return_value, mock_config = mock_api
mock_config.get.return_value = [sample_config_value]
get_config_item("http://test.com", "prompt", "template-1", "text")
captured = capsys.readouterr()
assert captured.out.strip() == sample_config_value.value
# Verify ConfigKey was constructed correctly
call_args = mock_config.get.call_args[0][0]
assert len(call_args) == 1
config_key = call_args[0]
assert config_key.type == "prompt"
assert config_key.key == "template-1"
@patch('trustgraph.cli.get_config_item.Api')
def test_get_config_item_json_format(self, mock_api_class, mock_api, sample_config_value, capsys):
"""Test getting config item in JSON format."""
mock_api_class.return_value, mock_config = mock_api
mock_config.get.return_value = [sample_config_value]
get_config_item("http://test.com", "prompt", "template-1", "json")
captured = capsys.readouterr()
output = json.loads(captured.out.strip())
assert output == sample_config_value.value
mock_config.get.assert_called_once()
@patch('trustgraph.cli.get_config_item.Api')
def test_get_config_item_not_found(self, mock_api_class, mock_api):
"""Test getting non-existent config item raises exception."""
mock_api_class.return_value, mock_config = mock_api
mock_config.get.return_value = []
with pytest.raises(Exception, match="Configuration item not found"):
get_config_item("http://test.com", "prompt", "nonexistent", "text")
def test_get_main_parses_args_correctly(self):
"""Test that get main() parses arguments correctly."""
test_args = [
'tg-get-config-item',
'--type', 'prompt',
'--key', 'template-1',
'--format', 'json',
'--api-url', 'http://custom.com'
]
with patch('sys.argv', test_args), \
patch('trustgraph.cli.get_config_item.get_config_item') as mock_get:
get_main()
mock_get.assert_called_once_with(
url='http://custom.com',
config_type='prompt',
key='template-1',
format_type='json'
)
class TestPutConfigItem:
"""Test the put_config_item function."""
@patch('trustgraph.cli.put_config_item.Api')
def test_put_config_item_with_value(self, mock_api_class, mock_api, capsys):
"""Test putting config item with command line value."""
mock_api_class.return_value, mock_config = mock_api
put_config_item("http://test.com", "prompt", "new-template", "Custom prompt: {input}")
captured = capsys.readouterr()
assert "Configuration item set: prompt/new-template" in captured.out
# Verify ConfigValue was constructed correctly
call_args = mock_config.put.call_args[0][0]
assert len(call_args) == 1
config_value = call_args[0]
assert config_value.type == "prompt"
assert config_value.key == "new-template"
assert config_value.value == "Custom prompt: {input}"
@patch('trustgraph.cli.put_config_item.Api')
def test_put_config_item_multiline_value(self, mock_api_class, mock_api):
"""Test putting config item with multiline value."""
mock_api_class.return_value, mock_config = mock_api
multiline_value = "Line 1\nLine 2\nLine 3"
put_config_item("http://test.com", "prompt", "multiline-template", multiline_value)
call_args = mock_config.put.call_args[0][0]
config_value = call_args[0]
assert config_value.value == multiline_value
def test_put_main_with_value_arg(self):
"""Test put main() with --value argument."""
test_args = [
'tg-put-config-item',
'--type', 'prompt',
'--key', 'new-template',
'--value', 'Custom prompt: {input}',
'--api-url', 'http://custom.com'
]
with patch('sys.argv', test_args), \
patch('trustgraph.cli.put_config_item.put_config_item') as mock_put:
put_main()
mock_put.assert_called_once_with(
url='http://custom.com',
config_type='prompt',
key='new-template',
value='Custom prompt: {input}'
)
def test_put_main_with_stdin_arg(self):
"""Test put main() with --stdin argument."""
test_args = [
'tg-put-config-item',
'--type', 'prompt',
'--key', 'stdin-template',
'--stdin'
]
stdin_content = "Content from stdin\nMultiple lines"
with patch('sys.argv', test_args), \
patch('sys.stdin', StringIO(stdin_content)), \
patch('trustgraph.cli.put_config_item.put_config_item') as mock_put:
put_main()
mock_put.assert_called_once_with(
url='http://localhost:8088/',
config_type='prompt',
key='stdin-template',
value=stdin_content
)
def test_put_main_mutually_exclusive_args(self):
"""Test that --value and --stdin are mutually exclusive."""
test_args = [
'tg-put-config-item',
'--type', 'prompt',
'--key', 'template',
'--value', 'test',
'--stdin'
]
with patch('sys.argv', test_args):
with pytest.raises(SystemExit):
put_main()
class TestDeleteConfigItem:
"""Test the delete_config_item function."""
@patch('trustgraph.cli.delete_config_item.Api')
def test_delete_config_item(self, mock_api_class, mock_api, capsys):
"""Test deleting config item."""
mock_api_class.return_value, mock_config = mock_api
delete_config_item("http://test.com", "prompt", "old-template")
captured = capsys.readouterr()
assert "Configuration item deleted: prompt/old-template" in captured.out
# Verify ConfigKey was constructed correctly
call_args = mock_config.delete.call_args[0][0]
assert len(call_args) == 1
config_key = call_args[0]
assert config_key.type == "prompt"
assert config_key.key == "old-template"
def test_delete_main_parses_args_correctly(self):
"""Test that delete main() parses arguments correctly."""
test_args = [
'tg-delete-config-item',
'--type', 'prompt',
'--key', 'old-template',
'--api-url', 'http://custom.com'
]
with patch('sys.argv', test_args), \
patch('trustgraph.cli.delete_config_item.delete_config_item') as mock_delete:
delete_main()
mock_delete.assert_called_once_with(
url='http://custom.com',
config_type='prompt',
key='old-template'
)
class TestErrorHandling:
"""Test error handling scenarios."""
@patch('trustgraph.cli.list_config_items.Api')
def test_list_handles_api_exception(self, mock_api_class, capsys):
"""Test that list command handles API exceptions."""
mock_api_class.side_effect = Exception("API connection failed")
list_main_with_args(['--type', 'prompt'])
captured = capsys.readouterr()
assert "Exception: API connection failed" in captured.out
@patch('trustgraph.cli.get_config_item.Api')
def test_get_handles_api_exception(self, mock_api_class, capsys):
"""Test that get command handles API exceptions."""
mock_api_class.side_effect = Exception("API connection failed")
get_main_with_args(['--type', 'prompt', '--key', 'test'])
captured = capsys.readouterr()
assert "Exception: API connection failed" in captured.out
@patch('trustgraph.cli.put_config_item.Api')
def test_put_handles_api_exception(self, mock_api_class, capsys):
"""Test that put command handles API exceptions."""
mock_api_class.side_effect = Exception("API connection failed")
put_main_with_args(['--type', 'prompt', '--key', 'test', '--value', 'test'])
captured = capsys.readouterr()
assert "Exception: API connection failed" in captured.out
@patch('trustgraph.cli.delete_config_item.Api')
def test_delete_handles_api_exception(self, mock_api_class, capsys):
"""Test that delete command handles API exceptions."""
mock_api_class.side_effect = Exception("API connection failed")
delete_main_with_args(['--type', 'prompt', '--key', 'test'])
captured = capsys.readouterr()
assert "Exception: API connection failed" in captured.out
class TestDataValidation:
"""Test data validation and edge cases."""
@patch('trustgraph.cli.get_config_item.Api')
def test_get_empty_string_value(self, mock_api_class, mock_api, capsys):
"""Test getting config item with empty string value."""
mock_api_class.return_value, mock_config = mock_api
empty_value = ConfigValue(type="prompt", key="empty", value="")
mock_config.get.return_value = [empty_value]
get_config_item("http://test.com", "prompt", "empty", "text")
captured = capsys.readouterr()
assert captured.out == "\n" # Just a newline from print()
@patch('trustgraph.cli.put_config_item.Api')
def test_put_empty_string_value(self, mock_api_class, mock_api):
"""Test putting config item with empty string value."""
mock_api_class.return_value, mock_config = mock_api
put_config_item("http://test.com", "prompt", "empty", "")
call_args = mock_config.put.call_args[0][0]
config_value = call_args[0]
assert config_value.value == ""
@patch('trustgraph.cli.get_config_item.Api')
def test_get_special_characters_value(self, mock_api_class, mock_api, capsys):
"""Test getting config item with special characters."""
mock_api_class.return_value, mock_config = mock_api
special_value = ConfigValue(
type="prompt",
key="special",
value="Special chars: äöü 中文 🌟 \"quotes\" 'apostrophes'"
)
mock_config.get.return_value = [special_value]
get_config_item("http://test.com", "prompt", "special", "text")
captured = capsys.readouterr()
assert "äöü 中文 🌟" in captured.out
assert '"quotes"' in captured.out
# Helper functions for testing main() with custom args
def list_main_with_args(args):
"""Helper to test list_main with custom arguments."""
test_args = ['tg-list-config-items'] + args
with patch('sys.argv', test_args):
try:
list_main()
except SystemExit:
pass
def get_main_with_args(args):
"""Helper to test get_main with custom arguments."""
test_args = ['tg-get-config-item'] + args
with patch('sys.argv', test_args):
try:
get_main()
except SystemExit:
pass
def put_main_with_args(args):
"""Helper to test put_main with custom arguments."""
test_args = ['tg-put-config-item'] + args
with patch('sys.argv', test_args):
try:
put_main()
except SystemExit:
pass
def delete_main_with_args(args):
"""Helper to test delete_main with custom arguments."""
test_args = ['tg-delete-config-item'] + args
with patch('sys.argv', test_args):
try:
delete_main()
except SystemExit:
pass

View file

@ -0,0 +1,272 @@
"""
Unit tests for Object Validation Logic
Tests the validation logic for extracted objects against schemas,
including handling of nested JSON format issues and field validation.
"""
import pytest
import json
from trustgraph.schema import RowSchema, Field
@pytest.fixture
def cities_schema():
"""Cities schema matching the production schema"""
fields = []
# Create fields with proper attribute assignment
f1 = Field()
f1.name = "city"
f1.type = "string"
f1.primary = True
f1.required = True
f1.description = "City name"
fields.append(f1)
f2 = Field()
f2.name = "country"
f2.type = "string"
f2.primary = True
f2.required = True
f2.description = "Country name"
fields.append(f2)
f3 = Field()
f3.name = "population"
f3.type = "integer"
f3.primary = False
f3.required = True
f3.description = "Population count"
fields.append(f3)
f4 = Field()
f4.name = "climate"
f4.type = "string"
f4.primary = False
f4.required = True
f4.description = "Climate type"
fields.append(f4)
f5 = Field()
f5.name = "primary_language"
f5.type = "string"
f5.primary = False
f5.required = True
f5.description = "Primary language spoken"
fields.append(f5)
f6 = Field()
f6.name = "currency"
f6.type = "string"
f6.primary = False
f6.required = True
f6.description = "Currency used"
fields.append(f6)
schema = RowSchema()
schema.name = "Cities"
schema.description = "City demographics"
schema.fields = fields
return schema
@pytest.fixture
def validator():
"""Create a mock processor with just the validation method"""
from unittest.mock import MagicMock
from trustgraph.extract.kg.objects.processor import Processor
# Create a mock processor
mock_processor = MagicMock()
# Bind the validate_object method to the mock
mock_processor.validate_object = Processor.validate_object.__get__(mock_processor, Processor)
return mock_processor
class TestObjectValidation:
"""Test cases for object validation logic"""
def test_valid_object_passes_validation(self, validator, cities_schema):
"""Test that a valid object passes validation"""
valid_obj = {
"city": "Shanghai",
"country": "China",
"population": "30482140",
"climate": "Humid subtropical",
"primary_language": "Mandarin Chinese",
"currency": "Chinese Yuan (CNY)"
}
result = validator.validate_object(valid_obj, cities_schema, "Cities")
assert result is True
def test_nested_json_format_fails_validation(self, validator, cities_schema):
"""Test that nested JSON format is detected and fails validation"""
nested_obj = {
"Cities": '{"city": "Jakarta", "country": "Indonesia", "population": 11634078, "climate": "Tropical monsoon", "primary_language": "Indonesian", "currency": "Indonesian Rupiah (IDR)"}'
}
result = validator.validate_object( nested_obj, cities_schema, "Cities")
assert result is False
def test_missing_required_field_fails_validation(self, validator, cities_schema):
"""Test that missing required field fails validation"""
missing_field_obj = {
"city": "London",
"country": "UK",
"population": "9000000",
"climate": "Temperate",
# Missing primary_language (required)
"currency": "GBP"
}
result = validator.validate_object( missing_field_obj, cities_schema, "Cities")
assert result is False
def test_null_primary_key_fails_validation(self, validator, cities_schema):
"""Test that null primary key field fails validation"""
null_primary_obj = {
"city": None, # Primary key is null
"country": "France",
"population": "2000000",
"climate": "Mediterranean",
"primary_language": "French",
"currency": "EUR"
}
result = validator.validate_object( null_primary_obj, cities_schema, "Cities")
assert result is False
def test_missing_primary_key_fails_validation(self, validator, cities_schema):
"""Test that missing primary key field fails validation"""
missing_primary_obj = {
# Missing city (primary key)
"country": "Spain",
"population": "3000000",
"climate": "Mediterranean",
"primary_language": "Spanish",
"currency": "EUR"
}
result = validator.validate_object( missing_primary_obj, cities_schema, "Cities")
assert result is False
def test_invalid_integer_type_fails_validation(self, validator, cities_schema):
"""Test that invalid integer value fails validation"""
invalid_type_obj = {
"city": "Tokyo",
"country": "Japan",
"population": "not_a_number", # Invalid integer
"climate": "Humid subtropical",
"primary_language": "Japanese",
"currency": "JPY"
}
result = validator.validate_object( invalid_type_obj, cities_schema, "Cities")
assert result is False
def test_numeric_string_for_integer_passes_validation(self, validator, cities_schema):
"""Test that numeric string for integer field passes validation"""
numeric_string_obj = {
"city": "Beijing",
"country": "China",
"population": "21540000", # String that can be converted to int
"climate": "Continental",
"primary_language": "Mandarin",
"currency": "CNY"
}
result = validator.validate_object( numeric_string_obj, cities_schema, "Cities")
assert result is True
def test_integer_value_for_integer_field_passes_validation(self, validator, cities_schema):
"""Test that actual integer value for integer field passes validation"""
integer_obj = {
"city": "Mumbai",
"country": "India",
"population": 20185064, # Actual integer
"climate": "Tropical",
"primary_language": "Hindi",
"currency": "INR"
}
result = validator.validate_object( integer_obj, cities_schema, "Cities")
assert result is True
def test_non_dict_object_fails_validation(self, validator, cities_schema):
"""Test that non-dictionary object fails validation"""
non_dict_obj = "This is not a dictionary"
result = validator.validate_object( non_dict_obj, cities_schema, "Cities")
assert result is False
def test_optional_field_missing_passes_validation(self, validator):
"""Test that missing optional field passes validation"""
# Create schema with optional field
fields = [
Field(name="id", type="string", primary=True, required=True),
Field(name="name", type="string", required=True),
Field(name="description", type="string", required=False), # Optional
]
schema = RowSchema(name="TestSchema", fields=fields)
obj = {
"id": "123",
"name": "Test Name",
# description is missing but optional
}
result = validator.validate_object( obj, schema, "TestSchema")
assert result is True
def test_float_type_validation(self, validator):
"""Test float type validation"""
fields = [
Field(name="id", type="string", primary=True, required=True),
Field(name="price", type="float", required=True),
]
schema = RowSchema(name="Product", fields=fields)
# Valid float as string
obj1 = {"id": "1", "price": "19.99"}
assert validator.validate_object( obj1, schema, "Product") is True
# Valid float
obj2 = {"id": "2", "price": 19.99}
assert validator.validate_object( obj2, schema, "Product") is True
# Valid integer (can be float)
obj3 = {"id": "3", "price": 20}
assert validator.validate_object( obj3, schema, "Product") is True
# Invalid float
obj4 = {"id": "4", "price": "not_a_float"}
assert validator.validate_object( obj4, schema, "Product") is False
def test_boolean_type_validation(self, validator):
"""Test boolean type validation"""
fields = [
Field(name="id", type="string", primary=True, required=True),
Field(name="active", type="boolean", required=True),
]
schema = RowSchema(name="User", fields=fields)
# Valid boolean
obj1 = {"id": "1", "active": True}
assert validator.validate_object( obj1, schema, "User") is True
# Valid boolean as string
obj2 = {"id": "2", "active": "true"}
assert validator.validate_object( obj2, schema, "User") is True
# Valid boolean as integer
obj3 = {"id": "3", "active": 1}
assert validator.validate_object( obj3, schema, "User") is True
# Invalid boolean type
obj4 = {"id": "4", "active": []}
assert validator.validate_object( obj4, schema, "User") is False

View file

@ -188,16 +188,36 @@ class TestVertexAIProcessorSimple(IsolatedAsyncioTestCase):
assert result.out_token == 0
assert result.model == 'gemini-2.0-flash-001'
<<<<<<< HEAD
=======
@patch('trustgraph.model.text_completion.vertexai.llm.google.auth.default')
>>>>>>> release/v1.2
@patch('trustgraph.model.text_completion.vertexai.llm.service_account')
@patch('trustgraph.model.text_completion.vertexai.llm.vertexai')
@patch('trustgraph.model.text_completion.vertexai.llm.GenerativeModel')
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
@patch('trustgraph.base.llm_service.LlmService.__init__')
<<<<<<< HEAD
async def test_processor_initialization_without_private_key(self, mock_llm_init, mock_async_init, mock_generative_model, mock_vertexai, mock_service_account):
"""Test processor initialization without private key (should fail)"""
# Arrange
mock_async_init.return_value = None
mock_llm_init.return_value = None
=======
async def test_processor_initialization_without_private_key(self, mock_llm_init, mock_async_init, mock_generative_model, mock_vertexai, mock_service_account, mock_auth_default):
"""Test processor initialization without private key (uses default credentials)"""
# Arrange
mock_async_init.return_value = None
mock_llm_init.return_value = None
# Mock google.auth.default() to return credentials and project ID
mock_credentials = MagicMock()
mock_auth_default.return_value = (mock_credentials, "test-project-123")
# Mock GenerativeModel
mock_model = MagicMock()
mock_generative_model.return_value = mock_model
>>>>>>> release/v1.2
config = {
'region': 'us-central1',
@ -210,9 +230,22 @@ class TestVertexAIProcessorSimple(IsolatedAsyncioTestCase):
'id': 'test-processor'
}
<<<<<<< HEAD
# Act & Assert
with pytest.raises(RuntimeError, match="Private key file not specified"):
processor = Processor(**config)
=======
# Act
processor = Processor(**config)
# Assert
assert processor.model == 'gemini-2.0-flash-001'
mock_auth_default.assert_called_once()
mock_vertexai.init.assert_called_once_with(
location='us-central1',
project='test-project-123'
)
>>>>>>> release/v1.2
@patch('trustgraph.model.text_completion.vertexai.llm.service_account')
@patch('trustgraph.model.text_completion.vertexai.llm.vertexai')
@ -292,12 +325,20 @@ class TestVertexAIProcessorSimple(IsolatedAsyncioTestCase):
# Verify service account was called with custom key
mock_service_account.Credentials.from_service_account_file.assert_called_once_with('custom-key.json')
<<<<<<< HEAD
# Verify that parameters dict has the correct values (this is accessible)
assert processor.parameters["temperature"] == 0.7
assert processor.parameters["max_output_tokens"] == 4096
assert processor.parameters["top_p"] == 1.0
assert processor.parameters["top_k"] == 32
assert processor.parameters["candidate_count"] == 1
=======
# Verify that api_params dict has the correct values (this is accessible)
assert processor.api_params["temperature"] == 0.7
assert processor.api_params["max_output_tokens"] == 4096
assert processor.api_params["top_p"] == 1.0
assert processor.api_params["top_k"] == 32
>>>>>>> release/v1.2
@patch('trustgraph.model.text_completion.vertexai.llm.service_account')
@patch('trustgraph.model.text_completion.vertexai.llm.vertexai')
@ -392,6 +433,61 @@ class TestVertexAIProcessorSimple(IsolatedAsyncioTestCase):
# The prompt should be "" + "\n\n" + "" = "\n\n"
assert call_args[0][0] == "\n\n"
<<<<<<< HEAD
=======
@patch('trustgraph.model.text_completion.vertexai.llm.AnthropicVertex')
@patch('trustgraph.model.text_completion.vertexai.llm.service_account')
@patch('trustgraph.base.async_processor.AsyncProcessor.__init__')
@patch('trustgraph.base.llm_service.LlmService.__init__')
async def test_anthropic_processor_initialization_with_private_key(self, mock_llm_init, mock_async_init, mock_service_account, mock_anthropic_vertex):
"""Test Anthropic processor initialization with private key credentials"""
# Arrange
mock_async_init.return_value = None
mock_llm_init.return_value = None
mock_credentials = MagicMock()
mock_credentials.project_id = "test-project-456"
mock_service_account.Credentials.from_service_account_file.return_value = mock_credentials
# Mock AnthropicVertex
mock_anthropic_client = MagicMock()
mock_anthropic_vertex.return_value = mock_anthropic_client
config = {
'region': 'us-west1',
'model': 'claude-3-sonnet@20240229', # Anthropic model
'temperature': 0.5,
'max_output': 2048,
'private_key': 'anthropic-key.json',
'concurrency': 1,
'taskgroup': AsyncMock(),
'id': 'test-anthropic-processor'
}
# Act
processor = Processor(**config)
# Assert
assert processor.model == 'claude-3-sonnet@20240229'
assert processor.is_anthropic == True
# Verify service account was called with private key
mock_service_account.Credentials.from_service_account_file.assert_called_once_with('anthropic-key.json')
# Verify AnthropicVertex was initialized with credentials
mock_anthropic_vertex.assert_called_once_with(
region='us-west1',
project_id='test-project-456',
credentials=mock_credentials
)
# Verify api_params are set correctly
assert processor.api_params["temperature"] == 0.5
assert processor.api_params["max_output_tokens"] == 2048
assert processor.api_params["top_p"] == 1.0
assert processor.api_params["top_k"] == 32
>>>>>>> release/v1.2
if __name__ == '__main__':
pytest.main([__file__])

View file

@ -18,15 +18,15 @@ class FlowRequestTranslator(MessageTranslator):
def from_pulsar(self, obj: FlowRequest) -> Dict[str, Any]:
result = {}
if obj.operation:
if obj.operation is not None:
result["operation"] = obj.operation
if obj.class_name:
if obj.class_name is not None:
result["class-name"] = obj.class_name
if obj.class_definition:
if obj.class_definition is not None:
result["class-definition"] = obj.class_definition
if obj.description:
if obj.description is not None:
result["description"] = obj.description
if obj.flow_id:
if obj.flow_id is not None:
result["flow-id"] = obj.flow_id
return result
@ -41,19 +41,19 @@ class FlowResponseTranslator(MessageTranslator):
def from_pulsar(self, obj: FlowResponse) -> Dict[str, Any]:
result = {}
if obj.class_names:
if obj.class_names is not None:
result["class-names"] = obj.class_names
if obj.flow_ids:
if obj.flow_ids is not None:
result["flow-ids"] = obj.flow_ids
if obj.class_definition:
if obj.class_definition is not None:
result["class-definition"] = obj.class_definition
if obj.flow:
if obj.flow is not None:
result["flow"] = obj.flow
if obj.description:
if obj.description is not None:
result["description"] = obj.description
return result
def from_response_with_completion(self, obj: FlowResponse) -> Tuple[Dict[str, Any], bool]:
"""Returns (response_dict, is_final)"""
return self.from_pulsar(obj), True
return self.from_pulsar(obj), True

View file

@ -78,6 +78,10 @@ tg-unload-kg-core = "trustgraph.cli.unload_kg_core:main"
tg-start-library-processing = "trustgraph.cli.start_library_processing:main"
tg-stop-flow = "trustgraph.cli.stop_flow:main"
tg-stop-library-processing = "trustgraph.cli.stop_library_processing:main"
tg-list-config-items = "trustgraph.cli.list_config_items:main"
tg-get-config-item = "trustgraph.cli.get_config_item:main"
tg-put-config-item = "trustgraph.cli.put_config_item:main"
tg-delete-config-item = "trustgraph.cli.delete_config_item:main"
[tool.setuptools.packages.find]
include = ["trustgraph*"]

View file

@ -0,0 +1,61 @@
"""
Deletes a configuration item
"""
import argparse
import os
from trustgraph.api import Api
from trustgraph.api.types import ConfigKey
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def delete_config_item(url, config_type, key):
api = Api(url).config()
config_key = ConfigKey(type=config_type, key=key)
api.delete([config_key])
print(f"Configuration item deleted: {config_type}/{key}")
def main():
parser = argparse.ArgumentParser(
prog='tg-delete-config-item',
description=__doc__,
)
parser.add_argument(
'--type',
required=True,
help='Configuration type',
)
parser.add_argument(
'--key',
required=True,
help='Configuration key',
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
args = parser.parse_args()
try:
delete_config_item(
url=args.api_url,
config_type=args.type,
key=args.key,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,78 @@
"""
Gets a specific configuration item
"""
import argparse
import os
import json
from trustgraph.api import Api
from trustgraph.api.types import ConfigKey
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def get_config_item(url, config_type, key, format_type):
api = Api(url).config()
config_key = ConfigKey(type=config_type, key=key)
values = api.get([config_key])
if not values:
raise Exception(f"Configuration item not found: {config_type}/{key}")
value = values[0].value
if format_type == "json":
print(json.dumps(value))
else:
print(value)
def main():
parser = argparse.ArgumentParser(
prog='tg-get-config-item',
description=__doc__,
)
parser.add_argument(
'--type',
required=True,
help='Configuration type',
)
parser.add_argument(
'--key',
required=True,
help='Configuration key',
)
parser.add_argument(
'--format',
choices=['text', 'json'],
default='text',
help='Output format (default: text)',
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
args = parser.parse_args()
try:
get_config_item(
url=args.api_url,
config_type=args.type,
key=args.key,
format_type=args.format,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,65 @@
"""
Lists configuration items for a specified type
"""
import argparse
import os
import json
from trustgraph.api import Api
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def list_config_items(url, config_type, format_type):
api = Api(url).config()
keys = api.list(config_type)
if format_type == "json":
print(json.dumps(keys))
else:
for key in keys:
print(key)
def main():
parser = argparse.ArgumentParser(
prog='tg-list-config-items',
description=__doc__,
)
parser.add_argument(
'--type',
required=True,
help='Configuration type to list',
)
parser.add_argument(
'--format',
choices=['text', 'json'],
default='text',
help='Output format (default: text)',
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
args = parser.parse_args()
try:
list_config_items(
url=args.api_url,
config_type=args.type,
format_type=args.format,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,80 @@
"""
Sets a configuration item
"""
import argparse
import os
import sys
from trustgraph.api import Api
from trustgraph.api.types import ConfigValue
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def put_config_item(url, config_type, key, value):
api = Api(url).config()
config_value = ConfigValue(type=config_type, key=key, value=value)
api.put([config_value])
print(f"Configuration item set: {config_type}/{key}")
def main():
parser = argparse.ArgumentParser(
prog='tg-put-config-item',
description=__doc__,
)
parser.add_argument(
'--type',
required=True,
help='Configuration type',
)
parser.add_argument(
'--key',
required=True,
help='Configuration key',
)
value_group = parser.add_mutually_exclusive_group(required=True)
value_group.add_argument(
'--value',
help='Configuration value',
)
value_group.add_argument(
'--stdin',
action='store_true',
help='Read configuration value from standard input',
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
args = parser.parse_args()
try:
if args.stdin:
value = sys.stdin.read()
else:
value = args.value
put_config_item(
url=args.api_url,
config_type=args.type,
key=args.key,
value=value,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -9,6 +9,10 @@ This script allows you to define agent tools with various types including:
Tools are stored in the 'tool' configuration group and can include
argument specifications for parameterized execution.
IMPORTANT: The tool 'name' is used by agents to invoke the tool and must
be a valid function identifier (use snake_case, no spaces or special chars).
The 'description' provides human-readable information about the tool.
"""
from typing import List
@ -114,14 +118,15 @@ def main():
number - Numeric parameter
Examples:
%(prog)s --id weather --name "Weather lookup" \\
%(prog)s --id weather_tool --name get_weather \\
--type knowledge-query \\
--description "Get weather information" \\
--description "Get weather information for a location" \\
--argument location:string:"Location to query" \\
--argument units:string:"Temperature units (C/F)"
%(prog)s --id calculator --name "Calculator" --type mcp-tool \\
--description "Perform calculations" \\
%(prog)s --id calc_tool --name calculate --type mcp-tool \\
--description "Perform mathematical calculations" \\
--mcp-tool calculator \\
--argument expression:string:"Mathematical expression"
''').strip(),
formatter_class=argparse.RawDescriptionHelpFormatter
@ -140,7 +145,7 @@ def main():
parser.add_argument(
'--name',
help=f'Human-readable tool name',
help=f'Tool name used by agents to invoke this tool (use snake_case, e.g., get_weather)',
)
parser.add_argument(
@ -221,4 +226,4 @@ def main():
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()
main()

View file

@ -104,6 +104,13 @@ class AgentManager:
# Parse Action
if line.startswith("Action:"):
action = line[7:].strip()
# Get rid of quotation prefix/suffix if present
while action and action[0] == '"':
action = action[1:]
while action and action[-1] == '"':
action = action[:-1]
# Parse Args
if line.startswith("Args:"):
@ -250,14 +257,25 @@ class AgentManager:
await think(act.thought)
logger.debug(f"ACTION: {act.name}")
logger.debug(f"Tools: {self.tools.keys()}")
if act.name in self.tools:
action = self.tools[act.name]
else:
logger.debug(f"Tools: {self.tools}")
raise RuntimeError(f"No action for {act.name}!")
logger.debug(f"TOOL>>> {act}")
resp = await action.implementation(context).invoke(
# Instantiate the tool implementation with context and config
if action.config:
tool_instance = action.implementation(context, **action.config)
else:
tool_instance = action.implementation(context)
resp = await tool_instance.invoke(
**act.arguments
)

View file

@ -45,7 +45,7 @@ class Processor(AgentService):
)
self.agent = AgentManager(
tools=[],
tools={},
additional_context="",
)
@ -106,11 +106,21 @@ class Processor(AgentService):
impl = TextCompletionImpl
arguments = TextCompletionImpl.get_arguments()
elif impl_id == "mcp-tool":
# For MCP tools, arguments come from config (similar to prompt tools)
config_args = data.get("arguments", [])
arguments = [
Argument(
name=arg.get("name"),
type=arg.get("type"),
description=arg.get("description")
)
for arg in config_args
]
impl = functools.partial(
McpToolImpl,
mcp_tool_id=data.get("mcp-tool")
mcp_tool_id=data.get("mcp-tool"),
arguments=arguments
)
arguments = McpToolImpl.get_arguments()
elif impl_id == "prompt":
# For prompt tools, arguments come from config
config_args = data.get("arguments", [])

View file

@ -27,7 +27,8 @@ class KnowledgeQueryImpl:
client = self.context("graph-rag-request")
logger.debug("Graph RAG question...")
return await client.rag(
arguments.get("question")
arguments.get("question"),
collection=self.collection if self.collection else "default"
)
# This tool implementation knows how to do text completion. This uses
@ -57,15 +58,14 @@ class TextCompletionImpl:
# the mcp-tool service.
class McpToolImpl:
def __init__(self, context, mcp_tool_id):
def __init__(self, context, mcp_tool_id, arguments=None):
self.context = context
self.mcp_tool_id = mcp_tool_id
self.arguments = arguments or []
@staticmethod
def get_arguments():
# MCP tools define their own arguments dynamically
# For now, we return empty list and let the MCP service handle validation
return []
def get_arguments(self):
# Return configured arguments if available, otherwise empty list for backward compatibility
return self.arguments
async def invoke(self, **arguments):

View file

@ -153,11 +153,81 @@ class Processor(FlowProcessor):
text=text
)
return objects if isinstance(objects, list) else []
if not isinstance(objects, list):
return []
# Validate each object against schema
validated_objects = []
for obj in objects:
if self.validate_object(obj, schema, schema_name):
validated_objects.append(obj)
else:
logger.warning(f"Skipping invalid object for schema {schema_name}")
return validated_objects
except Exception as e:
logger.error(f"Failed to extract objects for schema {schema_name}: {e}", exc_info=True)
return []
def validate_object(self, obj: Any, schema: RowSchema, schema_name: str) -> bool:
"""Validate object against schema definition"""
if not isinstance(obj, dict):
logger.warning(f"Object for schema {schema_name} is not a dictionary: {type(obj)}")
return False
# Check if this looks like a nested format issue
if schema_name in obj and isinstance(obj[schema_name], str):
logger.error(f"Object has nested JSON format under '{schema_name}' key - LLM returned incorrect format")
return False
# Check all required fields are present
for field in schema.fields:
if field.required and field.name not in obj:
logger.warning(f"Required field '{field.name}' missing in {schema_name} object")
return False
# Check primary key fields are not null
if field.primary and (field.name not in obj or obj[field.name] is None):
logger.error(f"Primary key field '{field.name}' is null or missing in {schema_name} object")
return False
# Validate basic type compatibility if value exists
if field.name in obj and obj[field.name] is not None:
value = obj[field.name]
# Type validation
if field.type == "integer":
try:
# Accept numeric strings that can be converted
if isinstance(value, str):
int(value)
elif not isinstance(value, (int, float)):
logger.warning(f"Field '{field.name}' in {schema_name} expected integer, got {type(value).__name__}")
return False
except ValueError:
logger.warning(f"Field '{field.name}' in {schema_name} value '{value}' cannot be converted to integer")
return False
elif field.type == "float":
try:
if isinstance(value, str):
float(value)
elif not isinstance(value, (int, float)):
logger.warning(f"Field '{field.name}' in {schema_name} expected float, got {type(value).__name__}")
return False
except ValueError:
logger.warning(f"Field '{field.name}' in {schema_name} value '{value}' cannot be converted to float")
return False
elif field.type == "boolean":
if not isinstance(value, (bool, str, int)):
logger.warning(f"Field '{field.name}' in {schema_name} expected boolean, got {type(value).__name__}")
return False
logger.debug(f"Object validated successfully for schema {schema_name}")
return True
async def on_chunk(self, msg, consumer, flow):
"""Process incoming chunk and extract objects"""

View file

@ -14,6 +14,7 @@ dependencies = [
"pulsar-client",
"google-cloud-aiplatform",
"prometheus-client",
"anthropic",
]
classifiers = [
"Programming Language :: Python :: 3",

View file

@ -1,7 +1,7 @@
"""
Simple LLM service, performs text prompt completion using VertexAI on
Google Cloud. Input is prompt, output is response.
Supports both Google's Gemini models and Anthropic's Claude models.
"""
#
@ -17,7 +17,7 @@ Google Cloud. Input is prompt, output is response.
# This module's imports bring in a lot of libraries.
from google.oauth2 import service_account
import google
import google.auth
import vertexai
import logging
@ -27,6 +27,9 @@ from vertexai.generative_models import (
HarmCategory, HarmBlockThreshold, Part, Tool, SafetySetting,
)
# Added for Anthropic model support
from anthropic import AnthropicVertex, RateLimitError
from .... exceptions import TooManyRequests
from .... base import LlmService, LlmResult
@ -35,7 +38,7 @@ logger = logging.getLogger(__name__)
default_ident = "text-completion"
default_model = 'gemini-2.0-flash-001'
default_model = 'gemini-1.5-flash-001'
default_region = 'us-central1'
default_temperature = 0.0
default_max_output = 8192
@ -52,111 +55,148 @@ class Processor(LlmService):
max_output = params.get("max_output", default_max_output)
if private_key is None:
raise RuntimeError("Private key file not specified")
logger.warning("Private key file not specified, using Application Default Credentials")
super(Processor, self).__init__(**params)
self.parameters = {
self.model = model
self.is_anthropic = 'claude' in self.model.lower()
# Shared parameters for both model types
self.api_params = {
"temperature": temperature,
"top_p": 1.0,
"top_k": 32,
"candidate_count": 1,
"max_output_tokens": max_output,
}
self.generation_config = GenerationConfig(
temperature=temperature,
top_p=1.0,
top_k=10,
candidate_count=1,
max_output_tokens=max_output,
)
# Block none doesn't seem to work
block_level = HarmBlockThreshold.BLOCK_ONLY_HIGH
# block_level = HarmBlockThreshold.BLOCK_NONE
self.safety_settings = [
SafetySetting(
category = HarmCategory.HARM_CATEGORY_HARASSMENT,
threshold = block_level,
),
SafetySetting(
category = HarmCategory.HARM_CATEGORY_HATE_SPEECH,
threshold = block_level,
),
SafetySetting(
category = HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
threshold = block_level,
),
SafetySetting(
category = HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
threshold = block_level,
),
]
logger.info("Initializing VertexAI...")
# Unified credential and project ID loading
if private_key:
credentials = (
service_account.Credentials.from_service_account_file(
private_key
)
)
project_id = credentials.project_id
else:
credentials = None
credentials, project_id = google.auth.default()
if credentials:
vertexai.init(
location=region,
credentials=credentials,
project=credentials.project_id,
)
else:
vertexai.init(
location=region
if not project_id:
raise RuntimeError(
"Could not determine Google Cloud project ID. "
"Ensure it's set in your environment or service account."
)
logger.info(f"Initializing model {model}")
self.llm = GenerativeModel(model)
self.model = model
# Initialize the appropriate client based on the model type
if self.is_anthropic:
logger.info(f"Initializing Anthropic model '{model}' via AnthropicVertex SDK")
# Initialize AnthropicVertex with credentials if provided, otherwise use ADC
anthropic_kwargs = {'region': region, 'project_id': project_id}
if credentials and private_key: # Pass credentials only if from a file
anthropic_kwargs['credentials'] = credentials
logger.debug(f"Using service account credentials for Anthropic model")
else:
logger.debug(f"Using Application Default Credentials for Anthropic model")
self.llm = AnthropicVertex(**anthropic_kwargs)
else:
# For Gemini models, initialize the Vertex AI SDK
logger.info(f"Initializing Google model '{model}' via Vertex AI SDK")
init_kwargs = {'location': region, 'project': project_id}
if credentials and private_key: # Pass credentials only if from a file
init_kwargs['credentials'] = credentials
vertexai.init(**init_kwargs)
self.llm = GenerativeModel(model)
self.generation_config = GenerationConfig(
temperature=temperature,
top_p=1.0,
top_k=10,
candidate_count=1,
max_output_tokens=max_output,
)
# Block none doesn't seem to work
block_level = HarmBlockThreshold.BLOCK_ONLY_HIGH
# block_level = HarmBlockThreshold.BLOCK_NONE
self.safety_settings = [
SafetySetting(
category = HarmCategory.HARM_CATEGORY_HARASSMENT,
threshold = block_level,
),
SafetySetting(
category = HarmCategory.HARM_CATEGORY_HATE_SPEECH,
threshold = block_level,
),
SafetySetting(
category = HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
threshold = block_level,
),
SafetySetting(
category = HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
threshold = block_level,
),
]
logger.info("VertexAI initialization complete")
async def generate_content(self, system, prompt):
try:
if self.is_anthropic:
# Anthropic API uses a dedicated system prompt
logger.debug("Sending request to Anthropic model...")
response = self.llm.messages.create(
model=self.model,
system=system,
messages=[{"role": "user", "content": prompt}],
max_tokens=self.api_params['max_output_tokens'],
temperature=self.api_params['temperature'],
top_p=self.api_params['top_p'],
top_k=self.api_params['top_k'],
)
prompt = system + "\n\n" + prompt
resp = LlmResult(
text=response.content[0].text,
in_token=response.usage.input_tokens,
out_token=response.usage.output_tokens,
model=self.model
)
else:
# Gemini API combines system and user prompts
logger.debug("Sending request to Gemini model...")
full_prompt = system + "\n\n" + prompt
response = self.llm.generate_content(
prompt, generation_config = self.generation_config,
safety_settings = self.safety_settings,
)
response = self.llm.generate_content(
full_prompt, generation_config = self.generation_config,
safety_settings = self.safety_settings,
)
resp = LlmResult(
text = response.text,
in_token = response.usage_metadata.prompt_token_count,
out_token = response.usage_metadata.candidates_token_count,
model = self.model
)
resp = LlmResult(
text = response.text,
in_token = response.usage_metadata.prompt_token_count,
out_token = response.usage_metadata.candidates_token_count,
model = self.model
)
logger.info(f"Input Tokens: {resp.in_token}")
logger.info(f"Output Tokens: {resp.out_token}")
logger.debug("Send response...")
return resp
except google.api_core.exceptions.ResourceExhausted as e:
except (google.api_core.exceptions.ResourceExhausted, RateLimitError) as e:
logger.warning(f"Hit rate limit: {e}")
# Leave rate limit retries to the base handler
raise TooManyRequests()
except Exception as e:
# Apart from rate limits, treat all exceptions as unrecoverable
logger.error(f"VertexAI LLM exception: {e}", exc_info=True)
raise e
@ -169,12 +209,12 @@ class Processor(LlmService):
parser.add_argument(
'-m', '--model',
default=default_model,
help=f'LLM model (default: {default_model})'
help=f'LLM model (e.g., gemini-1.5-flash-001, claude-3-sonnet@20240229) (default: {default_model})'
)
parser.add_argument(
'-k', '--private-key',
help=f'Google Cloud private JSON file'
help=f'Google Cloud private JSON file (optional, uses ADC if not provided)'
)
parser.add_argument(
@ -199,4 +239,3 @@ class Processor(LlmService):
def run():
Processor.launch(default_ident, __doc__)