Remove lots of overly-detailed code from the tech spec

This commit is contained in:
Cyber MacGeddon 2025-09-03 10:12:06 +01:00
parent e5abae6ade
commit 4e68b3581a

View file

@ -17,10 +17,10 @@ This document specifies a new agent architecture for TrustGraph that introduces
#### 1.2 High-Level Architecture #### 1.2 High-Level Architecture
``` ```
┌─────────────────────────────────────────────────────────────────┐ ┌─────────────────────────────────────────────────────────────────
│ Gateway Service Layer │ │ Gateway Service Layer │
│ (dispatch/agent_confidence.py) │ │ (dispatch/agent_confidence.py) │
└────────────────────────────┬────────────────────────────────────┘ └────────────────────────────┬────────────────────────────────────
Pulsar Message Bus Pulsar Message Bus
@ -28,21 +28,21 @@ This document specifies a new agent architecture for TrustGraph that introduces
│ Confidence Agent Service │ │ Confidence Agent Service │
│ (agent/confidence/service.py) │ │ (agent/confidence/service.py) │
│ │ │ │
│ ┌──────────────┐ ───────────────┐ ┌────────────────┐ │ ┌──────────────┐ ┌─────────────────┐ ┌────────────────┐ │
│ │ Planner │ │ Flow Controller│ │ Confidence │ │ │ Planner │ │ Flow Controller │ │ Confidence │
│ │ Module │─▶│ Module │─▶│ Evaluator │ │ │ │ Module │─▶│ Module │─▶│ Evaluator │ │
│ └──────────────┘ ───────────────┘ └────────────────┘ │ └──────────────┘ └─────────────────┘ └────────────────┘ │
│ │ │ │ │ │ │ │ │ │
│ ▼ ▼ ▼ │ │ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌───────────────┐ ┌────────────────┐ │ │ ┌──────────────┐ ┌───────────────┐ ┌────────────────┐ │
│ │ Execution │ │ Memory │ │ Audit │ │ │ │ Execution │ │ Memory │ │ Audit │ │
│ │ Engine │◄─│ Manager │ │ Logger │ │ │ Engine │◄─│ Manager │ │ Logger │ │
│ └──────────────┘ └───────────────┘ └────────────────┘ │ │ └──────────────┘ └───────────────┘ └────────────────┘ │
└───────────────────────────────────────────────────────────────┘ └──────────────────────────────────────────────────────────────────
Tool Service Clients Tool Service Clients
┌───────────────┬───────┴────────────────────────┐ ┌───────────────┬───────┴─────────┬─────────────────┐
▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼
KnowledgeQuery TextCompletion McpTool PromptService KnowledgeQuery TextCompletion McpTool PromptService
``` ```
@ -70,79 +70,51 @@ trustgraph-flow/trustgraph/agent/confidence/
New schemas in `trustgraph-base/trustgraph/schema/services/agent_confidence.py`: New schemas in `trustgraph-base/trustgraph/schema/services/agent_confidence.py`:
```python **ConfidenceMetrics**
from pulsar.schema import Record, String, Array, Map, Float, Integer, Boolean - `score`: Float - Confidence score (0.0 to 1.0)
- `reasoning`: String - Explanation of score calculation
- `retry_count`: Integer - Number of retries attempted
class ConfidenceMetrics(Record): **ExecutionStep**
score = Float() - `id`: String - Unique step identifier
reasoning = String() - `function`: String - Tool/function to execute
retry_count = Integer() - `arguments`: Map(String) - Arguments for the function
- `dependencies`: Array(String) - IDs of prerequisite steps
- `confidence_threshold`: Float - Minimum acceptable confidence
- `timeout_ms`: Integer - Execution timeout
class ExecutionStep(Record): **ExecutionPlan**
id = String() - `id`: String - Plan identifier
function = String() - `steps`: Array(ExecutionStep) - Ordered execution steps
arguments = Map(String()) - `context`: Map(String) - Global context for plan
dependencies = Array(String())
confidence_threshold = Float()
timeout_ms = Integer()
class ExecutionPlan(Record): **StepResult**
id = String() - `step_id`: String - Reference to ExecutionStep
steps = Array(ExecutionStep) - `success`: Boolean - Execution success status
context = Map(String()) - `output`: String - Step execution output
- `confidence`: ConfidenceMetrics - Confidence evaluation
- `execution_time_ms`: Integer - Actual execution time
class StepResult(Record): **ConfidenceAgentRequest**
step_id = String() - `question`: String - User query
success = Boolean() - `confidence_threshold`: Float - Global confidence threshold
output = String() - `max_retries`: Integer - Maximum retry attempts
confidence = ConfidenceMetrics() - `override_enabled`: Boolean - Allow user overrides
execution_time_ms = Integer() - `context`: Map(String) - Request context
class ConfidenceAgentRequest(Record): **ConfidenceAgentResponse**
question = String() - `answer`: String - Final answer to user
confidence_threshold = Float() - `plan`: ExecutionPlan - Generated execution plan
max_retries = Integer() - `results`: Array(StepResult) - All step results
override_enabled = Boolean() - `final_confidence`: Float - Overall confidence score
context = Map(String()) - `audit_trail`: String - Reference to detailed audit log
- `error`: Error - Error details if failed
class ConfidenceAgentResponse(Record):
answer = String()
plan = ExecutionPlan()
results = Array(StepResult)
final_confidence = Float()
audit_trail = String()
error = Error()
```
### 3. Module Implementation Details ### 3. Module Implementation Details
#### 3.1 Planner Module (`planner.py`) #### 3.1 Planner Module (`planner.py`)
```python The Planner Module generates structured execution plans from user requests using an LLM to create confidence-scored step sequences.
class PlannerModule:
"""
Generates structured execution plans from user requests.
Uses LLM to create confidence-scored step sequences.
"""
def __init__(self, llm_client, tool_registry):
self.llm_client = llm_client
self.tool_registry = tool_registry
async def generate_plan(
self,
request: str,
context: dict,
available_tools: list
) -> ExecutionPlan:
"""
Generate execution plan with confidence thresholds.
Returns:
ExecutionPlan with steps, dependencies, and confidence scores
"""
# Implementation details...
```
**Key Responsibilities:** **Key Responsibilities:**
- Parse user requests into structured plans - Parse user requests into structured plans
@ -152,33 +124,13 @@ class PlannerModule:
#### 3.2 Flow Controller (`flow_controller.py`) #### 3.2 Flow Controller (`flow_controller.py`)
```python The Flow Controller orchestrates plan execution with confidence-based control flow, managing step dependencies and retry logic.
class FlowController:
"""
Orchestrates plan execution with confidence-based control flow.
"""
def __init__(self, executor, memory_manager, confidence_evaluator): **Key Capabilities:**
self.executor = executor - Step dependency resolution
self.memory = memory_manager - Confidence-based retry logic
self.evaluator = confidence_evaluator - User override handling
- Graceful failure modes
async def execute_plan(
self,
plan: ExecutionPlan,
config: dict
) -> list[StepResult]:
"""
Execute plan with confidence threshold enforcement.
Implements:
- Step dependency resolution
- Confidence-based retry logic
- User override handling
- Graceful failure modes
"""
# Implementation details...
```
**Configuration Schema:** **Configuration Schema:**
```yaml ```yaml
@ -193,30 +145,7 @@ confidence_agent:
#### 3.3 Confidence Evaluator (`confidence.py`) #### 3.3 Confidence Evaluator (`confidence.py`)
```python The Confidence Evaluator calculates confidence scores for execution results based on multiple factors to ensure reliability.
class ConfidenceEvaluator:
"""
Evaluates confidence scores for execution results.
"""
async def evaluate_result(
self,
function_name: str,
input_args: dict,
output: any,
execution_context: dict
) -> ConfidenceMetrics:
"""
Calculate confidence score for a step result.
Factors:
- Output completeness
- Semantic consistency
- Historical success rates
- Function-specific rules
"""
# Implementation details...
```
**Confidence Scoring Factors:** **Confidence Scoring Factors:**
- Graph query result size and consistency - Graph query result size and consistency
@ -226,32 +155,7 @@ class ConfidenceEvaluator:
#### 3.4 Memory Manager (`memory.py`) #### 3.4 Memory Manager (`memory.py`)
```python The Memory Manager handles inter-step data flow and context preservation, ensuring efficient memory usage while maintaining necessary state.
class MemoryManager:
"""
Manages inter-step data flow and context preservation.
"""
def __init__(self, max_context_size: int = 8192):
self.max_context_size = max_context_size
self.step_outputs = {}
self.global_context = {}
async def store_result(
self,
step_id: str,
result: any,
metadata: dict
):
"""Store step result with intelligent pruning."""
async def get_context_for_step(
self,
step: ExecutionStep,
dependencies: list[str]
) -> dict:
"""Build context for step execution."""
```
**Memory Strategies:** **Memory Strategies:**
- Selective context passing based on dependencies - Selective context passing based on dependencies
@ -261,30 +165,7 @@ class MemoryManager:
#### 3.5 Executor Module (`executor.py`) #### 3.5 Executor Module (`executor.py`)
```python The Step Executor handles individual plan step execution using registered tools, managing tool selection, error handling, and result transformation.
class StepExecutor:
"""
Executes individual plan steps using registered tools.
"""
def __init__(self, tool_clients: dict):
self.tool_clients = tool_clients
async def execute_step(
self,
step: ExecutionStep,
context: dict
) -> StepResult:
"""
Execute a single step with appropriate tool.
Handles:
- Tool selection and invocation
- Error handling and timeouts
- Result transformation
"""
# Implementation details...
```
**Tool Mapping:** **Tool Mapping:**
- `GraphQuery` → GraphRagClient - `GraphQuery` → GraphRagClient
@ -294,60 +175,23 @@ class StepExecutor:
#### 3.6 Service Implementation (`service.py`) #### 3.6 Service Implementation (`service.py`)
```python The main service class coordinates all confidence agent components and handles request/response flow through the Pulsar message bus.
class ConfidenceAgentService(AgentService):
"""
Main service class for confidence-based agent.
"""
def __init__(self, **params): **Service Workflow:**
super().__init__(**params) 1. Generate execution plan via Planner Module
2. Execute plan with confidence control via Flow Controller
3. Generate response with confidence metrics and audit trail
# Initialize modules **Client Specifications:**
self.planner = PlannerModule(...) - TextCompletionClientSpec for LLM operations
self.flow_controller = FlowController(...) - GraphRagClientSpec for knowledge graph queries
self.executor = StepExecutor(...) - ToolClientSpec for MCP tool invocations
# Register client specifications
self.register_specification(TextCompletionClientSpec(...))
self.register_specification(GraphRagClientSpec(...))
self.register_specification(ToolClientSpec(...))
async def handle_request(
self,
request: ConfidenceAgentRequest
) -> ConfidenceAgentResponse:
"""
Main request handler implementing confidence-based flow.
"""
# 1. Generate plan
plan = await self.planner.generate_plan(...)
# 2. Execute with confidence control
results = await self.flow_controller.execute_plan(...)
# 3. Generate response
return ConfidenceAgentResponse(...)
```
### 4. Integration Points ### 4. Integration Points
#### 4.1 Gateway Integration #### 4.1 Gateway Integration
New dispatcher in `trustgraph-flow/trustgraph/gateway/dispatch/agent_confidence.py`: A new dispatcher will be created in `trustgraph-flow/trustgraph/gateway/dispatch/agent_confidence.py` that extends the existing ServiceRequestor pattern, using the ConfidenceAgentRequest and ConfidenceAgentResponse schemas for Pulsar message serialization.
```python
from ... schema import ConfidenceAgentRequest, ConfidenceAgentResponse
from . requestor import ServiceRequestor
class ConfidenceAgentRequestor(ServiceRequestor):
def __init__(self, pulsar_client, request_queue, response_queue, ...):
super().__init__(
request_schema=ConfidenceAgentRequest,
response_schema=ConfidenceAgentResponse,
...
)
```
#### 4.2 Configuration Integration #### 4.2 Configuration Integration
@ -417,25 +261,13 @@ sequenceDiagram
#### 5.2 Confidence-Based Control Flow #### 5.2 Confidence-Based Control Flow
```python The control flow implements a retry loop with exponential backoff:
async def execute_with_confidence(step, context, config):
retries = 0
while retries < config.max_retries:
result = await executor.execute_step(step, context)
confidence = await evaluator.evaluate_result(result)
if confidence.score >= step.confidence_threshold: 1. Execute step and evaluate confidence
return result 2. If confidence meets threshold, proceed to next step
3. If below threshold, retry with backoff delay
retries += 1 4. After max retries, either request user override or fail
await asyncio.sleep(config.retry_backoff_factor ** retries) 5. Log all attempts and decisions for audit trail
# Max retries exceeded
if config.override_enabled:
return await request_user_override(step, result)
else:
raise LowConfidenceError(step, confidence)
```
### 6. Monitoring and Observability ### 6. Monitoring and Observability
@ -443,27 +275,18 @@ async def execute_with_confidence(step, context, config):
New metrics to expose via Prometheus: New metrics to expose via Prometheus:
```python **Confidence Metrics:**
# Confidence metrics - `agent_confidence_score` - Histogram of confidence scores with buckets [0.1, 0.3, 0.5, 0.7, 0.9, 1.0]
confidence_score_histogram = Histogram( - `agent_confidence_failures` - Counter of steps failing confidence thresholds
'agent_confidence_score',
'Confidence scores distribution',
buckets=[0.1, 0.3, 0.5, 0.7, 0.9, 1.0]
)
# Retry metrics **Retry Metrics:**
retry_count_counter = Counter( - `agent_retry_count` - Counter of retries by function name
'agent_retry_count', - `agent_retry_success_rate` - Gauge of retry success percentage
'Number of retries by function',
['function_name']
)
# Plan execution metrics **Plan Execution Metrics:**
plan_execution_duration = Histogram( - `agent_plan_execution_seconds` - Histogram of total plan execution time
'agent_plan_execution_seconds', - `agent_step_execution_seconds` - Histogram of individual step execution time
'Plan execution duration' - `agent_plan_complexity` - Histogram of number of steps per plan
)
```
#### 6.2 Audit Trail #### 6.2 Audit Trail
@ -507,48 +330,31 @@ Structured audit logging format:
Location: `tests/unit/test_agent/test_confidence/` Location: `tests/unit/test_agent/test_confidence/`
```python **Test Coverage Areas:**
# test_planner.py - Plan generation with various request types
async def test_plan_generation(): - Confidence score calculation and validation
planner = PlannerModule(mock_llm, mock_tools) - Memory manager context handling
plan = await planner.generate_plan("Extract entities from document") - Flow controller retry logic
assert len(plan.steps) > 0 - Executor tool mapping and error handling
assert all(s.confidence_threshold > 0 for s in plan.steps)
# test_confidence.py
async def test_confidence_evaluation():
evaluator = ConfidenceEvaluator()
metrics = await evaluator.evaluate_result(...)
assert 0 <= metrics.score <= 1
```
#### 7.2 Integration Tests #### 7.2 Integration Tests
Location: `tests/integration/test_agent_confidence/` Location: `tests/integration/test_agent_confidence/`
```python **Test Scenarios:**
async def test_end_to_end_confidence_flow(): - End-to-end confidence flow with mock services
# Test complete flow with mock services - Multi-step plan execution with dependencies
request = ConfidenceAgentRequest( - Retry behavior under various confidence scores
question="What entities are in the knowledge graph?", - User override flow simulation
confidence_threshold=0.7 - Fallback to ReAct agent on failure
)
response = await agent.handle_request(request)
assert response.final_confidence >= 0.7
```
#### 7.3 Contract Tests #### 7.3 Contract Tests
Ensure compatibility with existing service contracts: **Contract Validation:**
- Pulsar message schema serialization/deserialization
```python - Compatibility with existing tool service interfaces
async def test_pulsar_message_compatibility(): - Gateway dispatcher protocol compliance
# Verify schema serialization/deserialization - Response format consistency with ReAct agent where applicable
request = ConfidenceAgentRequest(...)
serialized = schema.encode(request)
deserialized = schema.decode(serialized)
assert request == deserialized
```
### 8. Migration and Rollout ### 8. Migration and Rollout