diff --git a/docs/tech-specs/tool-services.md b/docs/tech-specs/tool-services.md index 994f8a01..6edf7551 100644 --- a/docs/tech-specs/tool-services.md +++ b/docs/tech-specs/tool-services.md @@ -2,7 +2,7 @@ ## Status -Draft - Gathering Requirements +Implemented ## Overview @@ -25,9 +25,9 @@ This is a two-tier model, analogous to MCP tools: - MCP: MCP server defines the tool interface → Tool config references it - Tool Services: Tool service defines the Pulsar interface → Tool config references it -## Current Architecture +## Background: Existing Tools -### Existing Tool Implementation +### Built-in Tool Implementation Tools are currently defined in `trustgraph-flow/trustgraph/agent/react/tools.py` with typed implementations: @@ -55,20 +55,21 @@ elif impl_id == "text-completion": # ... etc ``` -## Proposed Architecture +## Architecture ### Two-Tier Model #### Tier 1: Tool Service Descriptor A tool service defines a Pulsar service interface. It declares: -- The topic to call +- The Pulsar queues for request/response - Configuration parameters it requires from tools that use it ```json { "id": "custom-rag", - "topic": "custom-rag-request", + "request-queue": "non-persistent://tg/request/custom-rag", + "response-queue": "non-persistent://tg/response/custom-rag", "config-params": [ {"name": "collection", "required": true} ] @@ -80,7 +81,8 @@ A tool service that needs no configuration parameters: ```json { "id": "calculator", - "topic": "calc-request", + "request-queue": "non-persistent://tg/request/calc", + "response-queue": "non-persistent://tg/response/calc", "config-params": [] } ``` @@ -132,38 +134,34 @@ Multiple tools can reference the same service with different configurations: When a tool is invoked, the request to the tool service includes: - `user`: From the agent request (multi-tenancy) -- Config values: From the tool descriptor (e.g., `collection`) -- `arguments`: From the LLM +- `config`: JSON-encoded config values from the tool descriptor +- `arguments`: JSON-encoded arguments from the LLM ```json { "user": "alice", - "collection": "customers", - "arguments": { - "question": "What are the top customer complaints?" - } + "config": "{\"collection\": \"customers\"}", + "arguments": "{\"question\": \"What are the top customer complaints?\"}" } ``` +The tool service receives these as parsed dicts in the `invoke` method. + ### Generic Tool Service Implementation A `ToolServiceImpl` class invokes tool services based on configuration: ```python class ToolServiceImpl: - def __init__(self, service_topic, config_values, context): - self.service_topic = service_topic + def __init__(self, context, request_queue, response_queue, config_values, arguments, processor): + self.request_queue = request_queue + self.response_queue = response_queue self.config_values = config_values # e.g., {"collection": "customers"} - self.context = context + # ... - async def invoke(self, user, **arguments): - client = self.context(self.service_topic) - request = { - "user": user, - **self.config_values, - "arguments": arguments, - } - response = await client.call(request) + async def invoke(self, **arguments): + client = await self._get_or_create_client() + response = await client.call(user, self.config_values, arguments) if isinstance(response, str): return response else: @@ -197,9 +195,20 @@ The agent manager parses the LLM's response into `act.arguments` as a dict (`age Requests and responses use untyped dicts. No schema validation at the agent level - the tool service is responsible for validating its inputs. This provides maximum flexibility for defining new services. -### Client Interface: Direct Pulsar +### Client Interface: Direct Pulsar Topics -Tool services are invoked via direct Pulsar messaging, not through the existing typed client abstraction. The tool-service descriptor specifies a Pulsar queue name. A base class will be defined for implementing tool services. Implementation details to be determined during development. +Tool services use direct Pulsar topics without requiring flow configuration. The tool-service descriptor specifies the full queue names: + +```json +{ + "id": "joke-service", + "request-queue": "non-persistent://tg/request/joke", + "response-queue": "non-persistent://tg/response/joke", + "config-params": [...] +} +``` + +This allows services to be hosted in any namespace. ### Error Handling: Standard Error Convention @@ -232,8 +241,8 @@ This follows the existing pattern used throughout the codebase (e.g., `agent_ser Tool services can return streaming responses: - Multiple response messages with the same `id` in properties -- Each response includes `end_of_message: bool` field -- Final response has `end_of_message: True` +- Each response includes `end_of_stream: bool` field +- Final response has `end_of_stream: True` This matches the pattern used in `AgentResponse` and other streaming services. @@ -256,29 +265,187 @@ Tool services follow the same contract: This keeps the descriptor simple and places responsibility on the service to return an appropriate text response for the agent. -## Implementation Considerations +## Configuration Guide -### Configuration Structure +To add a new tool service, two configuration items are required: -Two new config sections: +### 1. Tool Service Configuration -``` -tool-service/ - custom-rag: {"id": "custom-rag", "topic": "...", "config-params": [...]} - calculator: {"id": "calculator", "topic": "...", "config-params": []} +Stored under the `tool-service` config key. Defines the Pulsar queues and available config parameters. -tool/ - query-customers: {"type": "tool-service", "service": "custom-rag", ...} - query-products: {"type": "tool-service", "service": "custom-rag", ...} +| Field | Required | Description | +|-------|----------|-------------| +| `id` | Yes | Unique identifier for the tool service | +| `request-queue` | Yes | Full Pulsar topic for requests (e.g., `non-persistent://tg/request/joke`) | +| `response-queue` | Yes | Full Pulsar topic for responses (e.g., `non-persistent://tg/response/joke`) | +| `config-params` | No | Array of config parameters the service accepts | + +Each config param can specify: +- `name`: Parameter name (required) +- `required`: Whether the parameter must be provided by tools (default: false) + +Example: +```json +{ + "id": "joke-service", + "request-queue": "non-persistent://tg/request/joke", + "response-queue": "non-persistent://tg/response/joke", + "config-params": [ + {"name": "style", "required": false} + ] +} ``` -### Files to Modify +### 2. Tool Configuration -| File | Changes | +Stored under the `tool` config key. Defines a tool that the agent can use. + +| Field | Required | Description | +|-------|----------|-------------| +| `type` | Yes | Must be `"tool-service"` | +| `name` | Yes | Tool name exposed to the LLM | +| `description` | Yes | Description of what the tool does (shown to LLM) | +| `service` | Yes | ID of the tool-service to invoke | +| `arguments` | No | Array of argument definitions for the LLM | +| *(config params)* | Varies | Any config params defined by the service | + +Each argument can specify: +- `name`: Argument name (required) +- `type`: Data type, e.g., `"string"` (required) +- `description`: Description shown to the LLM (required) + +Example: +```json +{ + "type": "tool-service", + "name": "tell-joke", + "description": "Tell a joke on a given topic", + "service": "joke-service", + "style": "pun", + "arguments": [ + { + "name": "topic", + "type": "string", + "description": "The topic for the joke (e.g., programming, animals, food)" + } + ] +} +``` + +### Loading Configuration + +Use `tg-put-config-item` to load configurations: + +```bash +# Load tool-service config +tg-put-config-item tool-service/joke-service < joke-service.json + +# Load tool config +tg-put-config-item tool/tell-joke < tell-joke.json +``` + +The agent-manager must be restarted to pick up new configurations. + +## Implementation Details + +### Schema + +Request and response types in `trustgraph-base/trustgraph/schema/services/tool_service.py`: + +```python +@dataclass +class ToolServiceRequest: + user: str = "" # User context for multi-tenancy + config: str = "" # JSON-encoded config values from tool descriptor + arguments: str = "" # JSON-encoded arguments from LLM + +@dataclass +class ToolServiceResponse: + error: Error | None = None + response: str = "" # String response (the observation) + end_of_stream: bool = False +``` + +### Server-Side: DynamicToolService + +Base class in `trustgraph-base/trustgraph/base/dynamic_tool_service.py`: + +```python +class DynamicToolService(AsyncProcessor): + """Base class for implementing tool services.""" + + def __init__(self, **params): + topic = params.get("topic", default_topic) + # Constructs topics: non-persistent://tg/request/{topic}, non-persistent://tg/response/{topic} + # Sets up Consumer and Producer + + async def invoke(self, user, config, arguments): + """Override this method to implement the tool's logic.""" + raise NotImplementedError() +``` + +### Client-Side: ToolServiceImpl + +Implementation in `trustgraph-flow/trustgraph/agent/react/tools.py`: + +```python +class ToolServiceImpl: + def __init__(self, context, request_queue, response_queue, config_values, arguments, processor): + # Uses the provided queue paths directly + # Creates ToolServiceClient on first use + + async def invoke(self, **arguments): + client = await self._get_or_create_client() + response = await client.call(user, config_values, arguments) + return response if isinstance(response, str) else json.dumps(response) +``` + +### Files + +| File | Purpose | |------|---------| -| `trustgraph-flow/trustgraph/agent/react/tools.py` | Add `ToolServiceImpl` | -| `trustgraph-flow/trustgraph/agent/react/service.py` | Load tool-service configs, handle `type: "tool-service"` in tool configs | -| `trustgraph-base/trustgraph/base/` | Add generic client call support | +| `trustgraph-base/trustgraph/schema/services/tool_service.py` | Request/response schemas | +| `trustgraph-base/trustgraph/base/tool_service_client.py` | Client for invoking services | +| `trustgraph-base/trustgraph/base/dynamic_tool_service.py` | Base class for service implementation | +| `trustgraph-flow/trustgraph/agent/react/tools.py` | `ToolServiceImpl` class | +| `trustgraph-flow/trustgraph/agent/react/service.py` | Config loading | + +### Example: Joke Service + +An example service in `trustgraph-flow/trustgraph/tool_service/joke/`: + +```python +class Processor(DynamicToolService): + async def invoke(self, user, config, arguments): + style = config.get("style", "pun") + topic = arguments.get("topic", "") + joke = pick_joke(topic, style) + return f"Hey {user}! Here's a {style} for you:\n\n{joke}" +``` + +Tool service config: +```json +{ + "id": "joke-service", + "request-queue": "non-persistent://tg/request/joke", + "response-queue": "non-persistent://tg/response/joke", + "config-params": [{"name": "style", "required": false}] +} +``` + +Tool config: +```json +{ + "type": "tool-service", + "name": "tell-joke", + "description": "Tell a joke on a given topic", + "service": "joke-service", + "style": "pun", + "arguments": [ + {"name": "topic", "type": "string", "description": "The topic for the joke"} + ] +} +``` ### Backward Compatibility diff --git a/tests/unit/test_agent/test_tool_service.py b/tests/unit/test_agent/test_tool_service.py new file mode 100644 index 00000000..8bcf39ce --- /dev/null +++ b/tests/unit/test_agent/test_tool_service.py @@ -0,0 +1,495 @@ +""" +Unit tests for Tool Service functionality + +Tests the dynamically pluggable tool services feature including: +- Tool service configuration parsing +- ToolServiceImpl initialization +- Request/response format +- Config parameter handling +""" + +import pytest +from unittest.mock import Mock, AsyncMock, patch, MagicMock +import json + + +class TestToolServiceConfigParsing: + """Test cases for tool service configuration parsing""" + + def test_tool_service_config_structure(self): + """Test that tool-service config has required fields""" + # Arrange + valid_config = { + "id": "joke-service", + "request-queue": "non-persistent://tg/request/joke", + "response-queue": "non-persistent://tg/response/joke", + "config-params": [ + {"name": "style", "required": False} + ] + } + + # Act & Assert + assert "id" in valid_config + assert "request-queue" in valid_config + assert "response-queue" in valid_config + assert valid_config["request-queue"].startswith("non-persistent://") + assert valid_config["response-queue"].startswith("non-persistent://") + + def test_tool_service_config_without_queues_is_invalid(self): + """Test that tool-service config requires request-queue and response-queue""" + # Arrange + invalid_config = { + "id": "joke-service", + "config-params": [] + } + + # Act & Assert + def validate_config(config): + request_queue = config.get("request-queue") + response_queue = config.get("response-queue") + if not request_queue or not response_queue: + raise RuntimeError("Tool-service must define 'request-queue' and 'response-queue'") + return True + + with pytest.raises(RuntimeError) as exc_info: + validate_config(invalid_config) + assert "request-queue" in str(exc_info.value) + + def test_tool_config_references_tool_service(self): + """Test that tool config correctly references a tool-service""" + # Arrange + tool_services = { + "joke-service": { + "id": "joke-service", + "request-queue": "non-persistent://tg/request/joke", + "response-queue": "non-persistent://tg/response/joke", + "config-params": [{"name": "style", "required": False}] + } + } + + tool_config = { + "type": "tool-service", + "name": "tell-joke", + "description": "Tell a joke on a given topic", + "service": "joke-service", + "style": "pun", + "arguments": [ + {"name": "topic", "type": "string", "description": "The topic for the joke"} + ] + } + + # Act + service_ref = tool_config.get("service") + service_config = tool_services.get(service_ref) + + # Assert + assert service_ref == "joke-service" + assert service_config is not None + assert service_config["request-queue"] == "non-persistent://tg/request/joke" + + def test_tool_config_extracts_config_values(self): + """Test that config values are extracted from tool config""" + # Arrange + tool_services = { + "joke-service": { + "id": "joke-service", + "request-queue": "non-persistent://tg/request/joke", + "response-queue": "non-persistent://tg/response/joke", + "config-params": [ + {"name": "style", "required": False}, + {"name": "max-length", "required": False} + ] + } + } + + tool_config = { + "type": "tool-service", + "name": "tell-joke", + "description": "Tell a joke", + "service": "joke-service", + "style": "pun", + "max-length": 100, + "arguments": [] + } + + # Act - simulate config extraction + service_config = tool_services[tool_config["service"]] + config_params = service_config.get("config-params", []) + config_values = {} + for param in config_params: + param_name = param.get("name") if isinstance(param, dict) else param + if param_name in tool_config: + config_values[param_name] = tool_config[param_name] + + # Assert + assert config_values == {"style": "pun", "max-length": 100} + + def test_required_config_param_validation(self): + """Test that required config params are validated""" + # Arrange + tool_services = { + "custom-service": { + "id": "custom-service", + "request-queue": "non-persistent://tg/request/custom", + "response-queue": "non-persistent://tg/response/custom", + "config-params": [ + {"name": "collection", "required": True}, + {"name": "optional-param", "required": False} + ] + } + } + + tool_config_missing_required = { + "type": "tool-service", + "name": "custom-tool", + "description": "Custom tool", + "service": "custom-service", + # Missing required "collection" param + "optional-param": "value" + } + + # Act & Assert + def validate_config_params(tool_config, service_config): + config_params = service_config.get("config-params", []) + for param in config_params: + param_name = param.get("name") + if param.get("required", False) and param_name not in tool_config: + raise RuntimeError(f"Missing required config param '{param_name}'") + return True + + service_config = tool_services["custom-service"] + with pytest.raises(RuntimeError) as exc_info: + validate_config_params(tool_config_missing_required, service_config) + assert "collection" in str(exc_info.value) + + +class TestToolServiceRequest: + """Test cases for tool service request format""" + + def test_request_format(self): + """Test that request is properly formatted with user, config, and arguments""" + # Arrange + user = "alice" + config_values = {"style": "pun", "collection": "jokes"} + arguments = {"topic": "programming"} + + # Act - simulate request building + request = { + "user": user, + "config": json.dumps(config_values), + "arguments": json.dumps(arguments) + } + + # Assert + assert request["user"] == "alice" + assert json.loads(request["config"]) == {"style": "pun", "collection": "jokes"} + assert json.loads(request["arguments"]) == {"topic": "programming"} + + def test_request_with_empty_config(self): + """Test request when no config values are provided""" + # Arrange + user = "bob" + config_values = {} + arguments = {"query": "test"} + + # Act + request = { + "user": user, + "config": json.dumps(config_values) if config_values else "{}", + "arguments": json.dumps(arguments) if arguments else "{}" + } + + # Assert + assert request["config"] == "{}" + assert json.loads(request["arguments"]) == {"query": "test"} + + +class TestToolServiceResponse: + """Test cases for tool service response handling""" + + def test_success_response_handling(self): + """Test handling of successful response""" + # Arrange + response = { + "error": None, + "response": "Hey alice! Here's a pun for you:\n\nWhy do programmers prefer dark mode?", + "end_of_stream": True + } + + # Act & Assert + assert response["error"] is None + assert "pun" in response["response"] + assert response["end_of_stream"] is True + + def test_error_response_handling(self): + """Test handling of error response""" + # Arrange + response = { + "error": { + "type": "tool-service-error", + "message": "Service unavailable" + }, + "response": "", + "end_of_stream": True + } + + # Act & Assert + assert response["error"] is not None + assert response["error"]["type"] == "tool-service-error" + assert response["error"]["message"] == "Service unavailable" + + def test_string_response_passthrough(self): + """Test that string responses are passed through directly""" + # Arrange + response_text = "This is a joke response" + + # Act - simulate response handling + def handle_response(response): + if isinstance(response, str): + return response + else: + return json.dumps(response) + + result = handle_response(response_text) + + # Assert + assert result == response_text + + def test_dict_response_json_serialization(self): + """Test that dict responses are JSON serialized""" + # Arrange + response_data = {"joke": "Why did the chicken cross the road?", "category": "classic"} + + # Act + def handle_response(response): + if isinstance(response, str): + return response + else: + return json.dumps(response) + + result = handle_response(response_data) + + # Assert + assert result == json.dumps(response_data) + assert json.loads(result) == response_data + + +class TestToolServiceImpl: + """Test cases for ToolServiceImpl class""" + + def test_tool_service_impl_initialization(self): + """Test ToolServiceImpl stores queues and config correctly""" + # Arrange + class MockArgument: + def __init__(self, name, type, description): + self.name = name + self.type = type + self.description = description + + # Simulate ToolServiceImpl initialization + class MockToolServiceImpl: + def __init__(self, context, request_queue, response_queue, config_values=None, arguments=None, processor=None): + self.context = context + self.request_queue = request_queue + self.response_queue = response_queue + self.config_values = config_values or {} + self.arguments = arguments or [] + self.processor = processor + self._client = None + + def get_arguments(self): + return self.arguments + + # Act + arguments = [ + MockArgument("topic", "string", "The topic for the joke") + ] + + impl = MockToolServiceImpl( + context=lambda x: None, + request_queue="non-persistent://tg/request/joke", + response_queue="non-persistent://tg/response/joke", + config_values={"style": "pun"}, + arguments=arguments, + processor=Mock() + ) + + # Assert + assert impl.request_queue == "non-persistent://tg/request/joke" + assert impl.response_queue == "non-persistent://tg/response/joke" + assert impl.config_values == {"style": "pun"} + assert len(impl.get_arguments()) == 1 + assert impl.get_arguments()[0].name == "topic" + + def test_tool_service_impl_client_caching(self): + """Test that client is cached and reused""" + # Arrange + client_key = "non-persistent://tg/request/joke|non-persistent://tg/response/joke" + + # Simulate client caching behavior + tool_service_clients = {} + + def get_or_create_client(request_queue, response_queue, clients_cache): + client_key = f"{request_queue}|{response_queue}" + if client_key in clients_cache: + return clients_cache[client_key], False # False = not created + client = Mock() + clients_cache[client_key] = client + return client, True # True = newly created + + # Act + client1, created1 = get_or_create_client( + "non-persistent://tg/request/joke", + "non-persistent://tg/response/joke", + tool_service_clients + ) + client2, created2 = get_or_create_client( + "non-persistent://tg/request/joke", + "non-persistent://tg/response/joke", + tool_service_clients + ) + + # Assert + assert created1 is True + assert created2 is False + assert client1 is client2 + + +class TestJokeServiceLogic: + """Test cases for the joke service example""" + + def test_topic_to_category_mapping(self): + """Test that topics are mapped to categories correctly""" + # Arrange + def map_topic_to_category(topic): + topic = topic.lower() + if "program" in topic or "code" in topic or "computer" in topic or "software" in topic: + return "programming" + elif "llama" in topic: + return "llama" + elif "animal" in topic or "dog" in topic or "cat" in topic or "bird" in topic: + return "animals" + elif "food" in topic or "eat" in topic or "cook" in topic or "drink" in topic: + return "food" + else: + return "default" + + # Act & Assert + assert map_topic_to_category("programming") == "programming" + assert map_topic_to_category("software engineering") == "programming" + assert map_topic_to_category("llamas") == "llama" + assert map_topic_to_category("llama") == "llama" + assert map_topic_to_category("animals") == "animals" + assert map_topic_to_category("my dog") == "animals" + assert map_topic_to_category("food") == "food" + assert map_topic_to_category("cooking recipes") == "food" + assert map_topic_to_category("random topic") == "default" + assert map_topic_to_category("") == "default" + + def test_joke_response_personalization(self): + """Test that joke responses include user personalization""" + # Arrange + user = "alice" + style = "pun" + joke = "Why do programmers prefer dark mode? Because light attracts bugs!" + + # Act + response = f"Hey {user}! Here's a {style} for you:\n\n{joke}" + + # Assert + assert "Hey alice!" in response + assert "pun" in response + assert joke in response + + def test_style_normalization(self): + """Test that invalid styles fall back to valid ones""" + import random + + # Arrange + valid_styles = ["pun", "dad-joke", "one-liner"] + + def normalize_style(style): + if style not in valid_styles: + return random.choice(valid_styles) + return style + + # Act & Assert + assert normalize_style("pun") == "pun" + assert normalize_style("dad-joke") == "dad-joke" + assert normalize_style("one-liner") == "one-liner" + assert normalize_style("invalid-style") in valid_styles + assert normalize_style("") in valid_styles + + +class TestDynamicToolServiceBase: + """Test cases for DynamicToolService base class behavior""" + + def test_topic_to_pulsar_path_conversion(self): + """Test that topic names are converted to full Pulsar paths""" + # Arrange + topic = "joke" + + # Act + request_topic = f"non-persistent://tg/request/{topic}" + response_topic = f"non-persistent://tg/response/{topic}" + + # Assert + assert request_topic == "non-persistent://tg/request/joke" + assert response_topic == "non-persistent://tg/response/joke" + + def test_request_parsing(self): + """Test parsing of incoming request""" + # Arrange + request_data = { + "user": "alice", + "config": '{"style": "pun"}', + "arguments": '{"topic": "programming"}' + } + + # Act + user = request_data.get("user", "trustgraph") + config = json.loads(request_data["config"]) if request_data["config"] else {} + arguments = json.loads(request_data["arguments"]) if request_data["arguments"] else {} + + # Assert + assert user == "alice" + assert config == {"style": "pun"} + assert arguments == {"topic": "programming"} + + def test_response_building(self): + """Test building of response message""" + # Arrange + response_text = "Hey alice! Here's a joke" + error = None + + # Act + response = { + "error": error, + "response": response_text if isinstance(response_text, str) else json.dumps(response_text), + "end_of_stream": True + } + + # Assert + assert response["error"] is None + assert response["response"] == "Hey alice! Here's a joke" + assert response["end_of_stream"] is True + + def test_error_response_building(self): + """Test building of error response""" + # Arrange + error_message = "Service temporarily unavailable" + + # Act + response = { + "error": { + "type": "tool-service-error", + "message": error_message + }, + "response": "", + "end_of_stream": True + } + + # Assert + assert response["error"]["type"] == "tool-service-error" + assert response["error"]["message"] == error_message + assert response["response"] == "" diff --git a/trustgraph-base/trustgraph/base/__init__.py b/trustgraph-base/trustgraph/base/__init__.py index 557109a2..f9f38060 100644 --- a/trustgraph-base/trustgraph/base/__init__.py +++ b/trustgraph-base/trustgraph/base/__init__.py @@ -32,6 +32,8 @@ from . agent_service import AgentService from . graph_rag_client import GraphRagClientSpec from . tool_service import ToolService from . tool_client import ToolClientSpec +from . dynamic_tool_service import DynamicToolService +from . tool_service_client import ToolServiceClientSpec from . agent_client import AgentClientSpec from . structured_query_client import StructuredQueryClientSpec from . row_embeddings_query_client import RowEmbeddingsQueryClientSpec diff --git a/trustgraph-base/trustgraph/base/dynamic_tool_service.py b/trustgraph-base/trustgraph/base/dynamic_tool_service.py new file mode 100644 index 00000000..f3fda6dd --- /dev/null +++ b/trustgraph-base/trustgraph/base/dynamic_tool_service.py @@ -0,0 +1,184 @@ + +""" +Base class for dynamically pluggable tool services. + +Tool services are Pulsar services that can be invoked as agent tools. +They receive a ToolServiceRequest with user, config, and arguments, +and return a ToolServiceResponse with the result. + +Uses direct Pulsar topics (no flow configuration required): +- Request: non-persistent://tg/request/{topic} +- Response: non-persistent://tg/response/{topic} +""" + +import json +import logging +import asyncio +import argparse +from prometheus_client import Counter + +from .. schema import ToolServiceRequest, ToolServiceResponse, Error +from .. exceptions import TooManyRequests +from . async_processor import AsyncProcessor +from . consumer import Consumer +from . producer import Producer +from . metrics import ConsumerMetrics, ProducerMetrics + +# Module logger +logger = logging.getLogger(__name__) + +default_concurrency = 1 +default_topic = "tool" + + +class DynamicToolService(AsyncProcessor): + """ + Base class for implementing dynamic tool services. + + Subclasses should override the `invoke` method to implement + the tool's logic. + + The invoke method receives: + - user: The user context for multi-tenancy + - config: Dict of config values from the tool descriptor + - arguments: Dict of arguments from the LLM + + And should return a string response (the observation). + """ + + def __init__(self, **params): + + super(DynamicToolService, self).__init__(**params) + + self.id = params.get("id") + topic = params.get("topic", default_topic) + + # Build direct Pulsar topic paths + request_topic = f"non-persistent://tg/request/{topic}" + response_topic = f"non-persistent://tg/response/{topic}" + + logger.info(f"Tool service topics: request={request_topic}, response={response_topic}") + + # Create consumer for requests + consumer_metrics = ConsumerMetrics( + processor=self.id, flow=None, name="request" + ) + + self.consumer = Consumer( + taskgroup=self.taskgroup, + backend=self.pubsub, + subscriber=f"{self.id}-request", + flow=None, + topic=request_topic, + schema=ToolServiceRequest, + handler=self.on_request, + metrics=consumer_metrics, + ) + + # Create producer for responses + producer_metrics = ProducerMetrics( + processor=self.id, flow=None, name="response" + ) + + self.producer = Producer( + backend=self.pubsub, + topic=response_topic, + schema=ToolServiceResponse, + metrics=producer_metrics, + ) + + if not hasattr(__class__, "tool_service_metric"): + __class__.tool_service_metric = Counter( + 'dynamic_tool_service_invocation_count', + 'Dynamic tool service invocation count', + ["id"], + ) + + async def start(self): + await super(DynamicToolService, self).start() + await self.producer.start() + await self.consumer.start() + logger.info(f"Tool service {self.id} started") + + async def on_request(self, msg, consumer, flow): + + id = None + + try: + + request = msg.value() + + # Sender-produced ID for correlation + id = msg.properties().get("id", "unknown") + + # Parse the request + user = request.user or "trustgraph" + config = json.loads(request.config) if request.config else {} + arguments = json.loads(request.arguments) if request.arguments else {} + + logger.debug(f"Tool service request: user={user}, config={config}, arguments={arguments}") + + # Invoke the tool implementation + response = await self.invoke(user, config, arguments) + + # Send success response + await self.producer.send( + ToolServiceResponse( + error=None, + response=response if isinstance(response, str) else json.dumps(response), + end_of_stream=True, + ), + properties={"id": id} + ) + + __class__.tool_service_metric.labels( + id=self.id, + ).inc() + + except TooManyRequests as e: + raise e + + except Exception as e: + + logger.error(f"Exception in dynamic tool service: {e}", exc_info=True) + + logger.info("Sending error response...") + + await self.producer.send( + ToolServiceResponse( + error=Error( + type="tool-service-error", + message=str(e), + ), + response="", + end_of_stream=True, + ), + properties={"id": id if id else "unknown"} + ) + + async def invoke(self, user, config, arguments): + """ + Invoke the tool service. + + Override this method in subclasses to implement the tool's logic. + + Args: + user: The user context for multi-tenancy + config: Dict of config values from the tool descriptor + arguments: Dict of arguments from the LLM + + Returns: + A string response (the observation) or a dict/list that will be JSON-encoded + """ + raise NotImplementedError("Subclasses must implement invoke()") + + @staticmethod + def add_args(parser): + + AsyncProcessor.add_args(parser) + + parser.add_argument( + '-t', '--topic', + default=default_topic, + help=f'Topic name for request/response (default: {default_topic})' + ) diff --git a/trustgraph-base/trustgraph/base/tool_service_client.py b/trustgraph-base/trustgraph/base/tool_service_client.py new file mode 100644 index 00000000..81930ba0 --- /dev/null +++ b/trustgraph-base/trustgraph/base/tool_service_client.py @@ -0,0 +1,90 @@ + +import json +import logging + +from . request_response_spec import RequestResponse, RequestResponseSpec +from .. schema import ToolServiceRequest, ToolServiceResponse + +logger = logging.getLogger(__name__) + + +class ToolServiceClient(RequestResponse): + """Client for invoking dynamically configured tool services.""" + + async def call(self, user, config, arguments, timeout=600): + """ + Call a tool service. + + Args: + user: User context for multi-tenancy + config: Dict of config values (e.g., {"collection": "customers"}) + arguments: Dict of arguments from LLM + timeout: Request timeout in seconds + + Returns: + Response string from the tool service + """ + resp = await self.request( + ToolServiceRequest( + user=user, + config=json.dumps(config) if config else "{}", + arguments=json.dumps(arguments) if arguments else "{}", + ), + timeout=timeout + ) + + if resp.error: + raise RuntimeError(resp.error.message) + + return resp.response + + async def call_streaming(self, user, config, arguments, callback, timeout=600): + """ + Call a tool service with streaming response. + + Args: + user: User context for multi-tenancy + config: Dict of config values + arguments: Dict of arguments from LLM + callback: Async function called with each response chunk + timeout: Request timeout in seconds + + Returns: + Final response string + """ + result = [] + + async def handle_response(resp): + if resp.error: + raise RuntimeError(resp.error.message) + + if resp.response: + result.append(resp.response) + await callback(resp.response) + + return resp.end_of_stream + + await self.request( + ToolServiceRequest( + user=user, + config=json.dumps(config) if config else "{}", + arguments=json.dumps(arguments) if arguments else "{}", + ), + timeout=timeout, + recipient=handle_response + ) + + return "".join(result) + + +class ToolServiceClientSpec(RequestResponseSpec): + """Specification for a tool service client.""" + + def __init__(self, request_name, response_name): + super(ToolServiceClientSpec, self).__init__( + request_name=request_name, + request_schema=ToolServiceRequest, + response_name=response_name, + response_schema=ToolServiceResponse, + impl=ToolServiceClient, + ) diff --git a/trustgraph-base/trustgraph/schema/services/__init__.py b/trustgraph-base/trustgraph/schema/services/__init__.py index 7b40ca0a..f246bc31 100644 --- a/trustgraph-base/trustgraph/schema/services/__init__.py +++ b/trustgraph-base/trustgraph/schema/services/__init__.py @@ -12,4 +12,5 @@ from .structured_query import * from .rows_query import * from .diagnosis import * from .collection import * -from .storage import * \ No newline at end of file +from .storage import * +from .tool_service import * \ No newline at end of file diff --git a/trustgraph-base/trustgraph/schema/services/tool_service.py b/trustgraph-base/trustgraph/schema/services/tool_service.py new file mode 100644 index 00000000..18315f29 --- /dev/null +++ b/trustgraph-base/trustgraph/schema/services/tool_service.py @@ -0,0 +1,25 @@ + +from dataclasses import dataclass + +from ..core.primitives import Error + + +@dataclass +class ToolServiceRequest: + """Request to a dynamically configured tool service.""" + # User context for multi-tenancy + user: str = "" + # Config values (collection, etc.) as JSON + config: str = "" + # Arguments from LLM as JSON + arguments: str = "" + + +@dataclass +class ToolServiceResponse: + """Response from a tool service.""" + error: Error | None = None + # Response text (the observation) + response: str = "" + # End of stream marker for streaming responses + end_of_stream: bool = False diff --git a/trustgraph-flow/pyproject.toml b/trustgraph-flow/pyproject.toml index b0a44bee..f447a8ef 100644 --- a/trustgraph-flow/pyproject.toml +++ b/trustgraph-flow/pyproject.toml @@ -122,6 +122,7 @@ triples-write-falkordb = "trustgraph.storage.triples.falkordb:run" triples-write-memgraph = "trustgraph.storage.triples.memgraph:run" triples-write-neo4j = "trustgraph.storage.triples.neo4j:run" wikipedia-lookup = "trustgraph.external.wikipedia:run" +joke-service = "trustgraph.tool_service.joke:run" [tool.setuptools.packages.find] include = ["trustgraph*"] diff --git a/trustgraph-flow/trustgraph/agent/react/service.py b/trustgraph-flow/trustgraph/agent/react/service.py index 1a44ef9e..1c96adef 100755 --- a/trustgraph-flow/trustgraph/agent/react/service.py +++ b/trustgraph-flow/trustgraph/agent/react/service.py @@ -17,7 +17,7 @@ from ... base import RowEmbeddingsQueryClientSpec, EmbeddingsClientSpec from ... schema import AgentRequest, AgentResponse, AgentStep, Error -from . tools import KnowledgeQueryImpl, TextCompletionImpl, McpToolImpl, PromptImpl, StructuredQueryImpl, RowEmbeddingsQueryImpl +from . tools import KnowledgeQueryImpl, TextCompletionImpl, McpToolImpl, PromptImpl, StructuredQueryImpl, RowEmbeddingsQueryImpl, ToolServiceImpl from . agent_manager import AgentManager from ..tool_filter import validate_tool_config, filter_tools_by_group_and_state, get_next_state @@ -51,6 +51,9 @@ class Processor(AgentService): additional_context="", ) + # Track active tool service clients for cleanup + self.tool_service_clients = {} + self.config_handlers.append(self.on_tools_config) self.register_specification( @@ -110,6 +113,16 @@ class Processor(AgentService): tools = {} + # Load tool-service configurations first + tool_services = {} + if "tool-service" in config: + for service_id, service_value in config["tool-service"].items(): + service_data = json.loads(service_value) + tool_services[service_id] = service_data + logger.debug(f"Loaded tool-service config: {service_id}") + + logger.info(f"Loaded {len(tool_services)} tool-service configurations") + # Load tool configurations from the new location if "tool" in config: for tool_id, tool_value in config["tool"].items(): @@ -177,6 +190,59 @@ class Processor(AgentService): limit=int(data.get("limit", 10)) # Max results ) arguments = RowEmbeddingsQueryImpl.get_arguments() + elif impl_id == "tool-service": + # Dynamic tool service - look up the service config + service_ref = data.get("service") + if not service_ref: + raise RuntimeError( + f"Tool {name} has type 'tool-service' but no 'service' reference" + ) + if service_ref not in tool_services: + raise RuntimeError( + f"Tool {name} references unknown tool-service '{service_ref}'" + ) + + service_config = tool_services[service_ref] + request_queue = service_config.get("request-queue") + response_queue = service_config.get("response-queue") + if not request_queue or not response_queue: + raise RuntimeError( + f"Tool-service '{service_ref}' must define 'request-queue' and 'response-queue'" + ) + + # Build config values from tool config + # Extract any config params defined by the service + config_params = service_config.get("config-params", []) + config_values = {} + for param in config_params: + param_name = param.get("name") if isinstance(param, dict) else param + if param_name in data: + config_values[param_name] = data[param_name] + elif isinstance(param, dict) and param.get("required", False): + raise RuntimeError( + f"Tool {name} missing required config param '{param_name}'" + ) + + # Arguments come from tool config + config_args = data.get("arguments", []) + arguments = [ + Argument( + name=arg.get("name"), + type=arg.get("type"), + description=arg.get("description") + ) + for arg in config_args + ] + + # Store queues for the implementation + impl = functools.partial( + ToolServiceImpl, + request_queue=request_queue, + response_queue=response_queue, + config_values=config_values, + arguments=arguments, + processor=self, + ) else: raise RuntimeError( f"Tool type {impl_id} not known" diff --git a/trustgraph-flow/trustgraph/agent/react/tools.py b/trustgraph-flow/trustgraph/agent/react/tools.py index 2b442a0d..18675084 100644 --- a/trustgraph-flow/trustgraph/agent/react/tools.py +++ b/trustgraph-flow/trustgraph/agent/react/tools.py @@ -202,3 +202,116 @@ class PromptImpl: id=self.template_id, variables=arguments ) + + +# This tool implementation invokes a dynamically configured tool service +class ToolServiceImpl: + """ + Implementation for dynamically pluggable tool services. + + Tool services are external Pulsar services that can be invoked as agent tools. + The service is configured via a tool-service descriptor that defines the queues, + and a tool descriptor that provides config values and argument definitions. + """ + + def __init__(self, context, request_queue, response_queue, config_values=None, arguments=None, processor=None): + """ + Initialize a tool service implementation. + + Args: + context: The context function (provides user info) + request_queue: Full Pulsar topic for requests + response_queue: Full Pulsar topic for responses + config_values: Dict of config values (e.g., {"collection": "customers"}) + arguments: List of Argument objects defining the tool's parameters + processor: The Processor instance (for pubsub access) + """ + self.context = context + self.request_queue = request_queue + self.response_queue = response_queue + self.config_values = config_values or {} + self.arguments = arguments or [] + self.processor = processor + self._client = None + + def get_arguments(self): + return self.arguments + + async def _get_or_create_client(self): + """Get or create the tool service client.""" + if self._client is not None: + return self._client + + # Check if processor already has a client for this queue pair + client_key = f"{self.request_queue}|{self.response_queue}" + if client_key in self.processor.tool_service_clients: + self._client = self.processor.tool_service_clients[client_key] + return self._client + + # Import here to avoid circular imports + from trustgraph.base.tool_service_client import ToolServiceClient + from trustgraph.base.metrics import ProducerMetrics, SubscriberMetrics + from trustgraph.schema import ToolServiceRequest, ToolServiceResponse + import uuid + + request_metrics = ProducerMetrics( + processor=self.processor.id, + flow="tool-service", + name=self.request_queue + ) + response_metrics = SubscriberMetrics( + processor=self.processor.id, + flow="tool-service", + name=self.response_queue + ) + + # Create unique subscription for responses + subscription = f"{self.processor.id}--tool-service--{uuid.uuid4()}" + + self._client = ToolServiceClient( + backend=self.processor.pubsub, + subscription=subscription, + consumer_name=self.processor.id, + request_topic=self.request_queue, + request_schema=ToolServiceRequest, + request_metrics=request_metrics, + response_topic=self.response_queue, + response_schema=ToolServiceResponse, + response_metrics=response_metrics, + ) + + # Start the client + await self._client.start() + + # Register for cleanup + self.processor.tool_service_clients[client_key] = self._client + + logger.debug(f"Created tool service client for {self.request_queue}") + return self._client + + async def invoke(self, **arguments): + logger.debug(f"Tool service invocation: {self.request_queue}...") + logger.debug(f"Config: {self.config_values}") + logger.debug(f"Arguments: {arguments}") + + # Get user from context if available + user = "trustgraph" + if hasattr(self.context, '_user'): + user = self.context._user + + # Get or create the client + client = await self._get_or_create_client() + + # Call the tool service + response = await client.call( + user=user, + config=self.config_values, + arguments=arguments, + ) + + logger.debug(f"Tool service response: {response}") + + if isinstance(response, str): + return response + else: + return json.dumps(response) diff --git a/trustgraph-flow/trustgraph/tool_service/__init__.py b/trustgraph-flow/trustgraph/tool_service/__init__.py new file mode 100644 index 00000000..76d859b9 --- /dev/null +++ b/trustgraph-flow/trustgraph/tool_service/__init__.py @@ -0,0 +1 @@ +# Tool service implementations diff --git a/trustgraph-flow/trustgraph/tool_service/joke/__init__.py b/trustgraph-flow/trustgraph/tool_service/joke/__init__.py new file mode 100644 index 00000000..4322f49f --- /dev/null +++ b/trustgraph-flow/trustgraph/tool_service/joke/__init__.py @@ -0,0 +1,2 @@ +# Joke tool service +from .service import run diff --git a/trustgraph-flow/trustgraph/tool_service/joke/service.py b/trustgraph-flow/trustgraph/tool_service/joke/service.py new file mode 100644 index 00000000..d9b7cde0 --- /dev/null +++ b/trustgraph-flow/trustgraph/tool_service/joke/service.py @@ -0,0 +1,204 @@ +""" +Joke Tool Service - An example dynamic tool service. + +This service demonstrates the tool service integration by: +- Using the 'user' field to personalize responses +- Using config params (style) to customize joke style +- Using arguments (topic) to generate topic-specific jokes + +Example tool-service config: +{ + "id": "joke-service", + "topic": "joke", + "config-params": [ + {"name": "style", "required": false} + ] +} + +Example tool config: +{ + "type": "tool-service", + "name": "tell-joke", + "description": "Tell a joke on a given topic", + "service": "joke-service", + "style": "pun", + "arguments": [ + { + "name": "topic", + "type": "string", + "description": "The topic for the joke (e.g., programming, animals, food)" + } + ] +} +""" + +import random +import logging + +from ... base import DynamicToolService + +# Module logger +logger = logging.getLogger(__name__) + +default_ident = "joke-service" +default_topic = "joke" + +# Joke database organized by topic and style +JOKES = { + "programming": { + "pun": [ + "Why do programmers prefer dark mode? Because light attracts bugs!", + "Why do Java developers wear glasses? Because they can't C#!", + "A SQL query walks into a bar, walks up to two tables and asks... 'Can I join you?'", + "Why was the JavaScript developer sad? Because he didn't Node how to Express himself!", + ], + "dad-joke": [ + "I told my computer I needed a break, and now it won't stop sending me Kit-Kat ads.", + "My son asked me to explain what a linked list is. I said 'I'll tell you, and then I'll tell you again, and again...'", + "I asked my computer for a joke about UDP. I'm not sure if it got it.", + ], + "one-liner": [ + "There are only 10 types of people: those who understand binary and those who don't.", + "A programmer's wife tells him: 'Go to the store and get a loaf of bread. If they have eggs, get a dozen.' He returns with 12 loaves.", + "99 little bugs in the code, 99 little bugs. Take one down, patch it around, 127 little bugs in the code.", + ], + }, + "llama": { + "pun": [ + "Why did the llama get a ticket? Because he was caught spitting in a no-spitting zone!", + "What do you call a llama who's a great musician? A llama del Rey!", + "Why did the llama cross the road? To prove he wasn't a chicken!", + ], + "dad-joke": [ + "What did the llama say when he got kicked out of the zoo? 'Alpaca my bags!'", + "Why don't llamas ever get lost? Because they always know the way to the Andes!", + "What do you call a llama with no legs? A woolly rug!", + ], + "one-liner": [ + "Llamas are great at meditation. They're always saying 'Dalai Llama.'", + "I asked a llama for directions. He said 'No probllama!'", + "Never trust a llama. They're always up to something woolly.", + ], + }, + "animals": { + "pun": [ + "What do you call a fish without eyes? A fsh!", + "Why don't scientists trust atoms? Because they make up everything... just like that cat who blamed the dog!", + "What do you call a bear with no teeth? A gummy bear!", + ], + "dad-joke": [ + "I tried to catch some fog earlier. I mist. My dog wasn't impressed either.", + "What do you call a dog that does magic tricks? A Labracadabrador!", + "Why do cows wear bells? Because their horns don't work!", + ], + "one-liner": [ + "I'm reading a book about anti-gravity. It's impossible to put down, unlike my cat.", + "A horse walks into a bar. The bartender asks 'Why the long face?'", + "What's orange and sounds like a parrot? A carrot!", + ], + }, + "food": { + "pun": [ + "I'm on a seafood diet. I see food and I eat it!", + "Why did the tomato turn red? Because it saw the salad dressing!", + "What do you call cheese that isn't yours? Nacho cheese!", + ], + "dad-joke": [ + "I used to hate facial hair, but then it grew on me. Speaking of growing, have you tried my garden salad?", + "Why don't eggs tell jokes? They'd crack each other up!", + "I told my wife she was drawing her eyebrows too high. She looked surprised, then made me a sandwich.", + ], + "one-liner": [ + "I'm reading a book about submarines and sandwiches. It's a sub-genre.", + "Broken puppets for sale. No strings attached. Also, free spaghetti!", + "I ordered a chicken and an egg online. I'll let you know which comes first.", + ], + }, + "default": { + "pun": [ + "Time flies like an arrow. Fruit flies like a banana!", + "I used to be a banker, but I lost interest.", + "I'm reading a book on the history of glue. I can't put it down!", + ], + "dad-joke": [ + "I'm afraid for the calendar. Its days are numbered.", + "I only know 25 letters of the alphabet. I don't know y.", + "Did you hear about the claustrophobic astronaut? He just needed a little space.", + ], + "one-liner": [ + "I told my wife she was drawing her eyebrows too high. She looked surprised.", + "I'm not lazy, I'm on energy-saving mode.", + "Parallel lines have so much in common. It's a shame they'll never meet.", + ], + }, +} + + +class Processor(DynamicToolService): + """ + Joke tool service that demonstrates the tool service integration. + """ + + def __init__(self, **params): + super(Processor, self).__init__(**params) + logger.info("Joke service initialized") + + async def invoke(self, user, config, arguments): + """ + Generate a joke based on the topic and style. + + Args: + user: The user requesting the joke + config: Config values including 'style' (pun, dad-joke, one-liner) + arguments: Arguments including 'topic' (programming, animals, food) + + Returns: + A personalized joke string + """ + # Get style from config (default: random) + style = config.get("style", random.choice(["pun", "dad-joke", "one-liner"])) + + # Get topic from arguments (default: random) + topic = arguments.get("topic", "").lower() + + # Map topic to our categories + if "program" in topic or "code" in topic or "computer" in topic or "software" in topic: + category = "programming" + elif "llama" in topic: + category = "llama" + elif "animal" in topic or "dog" in topic or "cat" in topic or "bird" in topic: + category = "animals" + elif "food" in topic or "eat" in topic or "cook" in topic or "drink" in topic: + category = "food" + else: + category = "default" + + # Normalize style + if style not in ["pun", "dad-joke", "one-liner"]: + style = random.choice(["pun", "dad-joke", "one-liner"]) + + # Get jokes for this category and style + jokes = JOKES.get(category, JOKES["default"]).get(style, JOKES["default"]["pun"]) + + # Pick a random joke + joke = random.choice(jokes) + + # Personalize the response + response = f"Hey {user}! Here's a {style} for you:\n\n{joke}" + + logger.debug(f"Generated joke for user={user}, style={style}, topic={topic}") + + return response + + @staticmethod + def add_args(parser): + DynamicToolService.add_args(parser) + # Override the topic default for this service + for action in parser._actions: + if '--topic' in action.option_strings: + action.default = default_topic + break + + +def run(): + Processor.launch(default_ident, __doc__)