diff --git a/docs/tech-specs/confidence-based-agents.md b/docs/tech-specs/confidence-based-agents.md index dd649143..d00cc252 100644 --- a/docs/tech-specs/confidence-based-agents.md +++ b/docs/tech-specs/confidence-based-agents.md @@ -17,34 +17,34 @@ This document specifies a new agent architecture for TrustGraph that introduces #### 1.2 High-Level Architecture ``` -┌─────────────────────────────────────────────────────────────────┐ +┌──────────────────────────────────────────────────────────────────┐ │ Gateway Service Layer │ │ (dispatch/agent_confidence.py) │ -└────────────────────────────┬────────────────────────────────────┘ +└────────────────────────────┬─────────────────────────────────────┘ │ Pulsar Message Bus │ ┌─────────────────────────────┴────────────────────────────────────┐ │ Confidence Agent Service │ │ (agent/confidence/service.py) │ -│ │ -│ ┌──────────────┐ ┌───────────────┐ ┌────────────────┐ │ -│ │ Planner │ │ Flow Controller│ │ Confidence │ │ -│ │ Module │─▶│ Module │─▶│ Evaluator │ │ -│ └──────────────┘ └───────────────┘ └────────────────┘ │ +│ │ +│ ┌──────────────┐ ┌─────────────────┐ ┌────────────────┐ │ +│ │ Planner │ │ Flow Controller │ │ Confidence │ │ +│ │ Module │─▶│ Module │─▶│ Evaluator │ │ +│ └──────────────┘ └─────────────────┘ └────────────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ -│ ┌──────────────┐ ┌───────────────┐ ┌────────────────┐ │ -│ │ Execution │ │ Memory │ │ Audit │ │ -│ │ Engine │◄─│ Manager │ │ Logger │ │ -│ └──────────────┘ └───────────────┘ └────────────────┘ │ -└───────────────────────────────────────────────────────────────┘ +│ ┌──────────────┐ ┌───────────────┐ ┌────────────────┐ │ +│ │ Execution │ │ Memory │ │ Audit │ │ +│ │ Engine │◄──│ Manager │ │ Logger │ │ +│ └──────────────┘ └───────────────┘ └────────────────┘ │ +└──────────────────────────────────────────────────────────────────┘ │ Tool Service Clients │ - ┌───────────────┬───────┴────────┬────────────────┐ + ┌───────────────┬───────┴─────────┬─────────────────┐ ▼ ▼ ▼ ▼ -KnowledgeQuery TextCompletion McpTool PromptService +KnowledgeQuery TextCompletion McpTool PromptService ``` ### 2. Module Specifications @@ -70,79 +70,51 @@ trustgraph-flow/trustgraph/agent/confidence/ New schemas in `trustgraph-base/trustgraph/schema/services/agent_confidence.py`: -```python -from pulsar.schema import Record, String, Array, Map, Float, Integer, Boolean +**ConfidenceMetrics** +- `score`: Float - Confidence score (0.0 to 1.0) +- `reasoning`: String - Explanation of score calculation +- `retry_count`: Integer - Number of retries attempted -class ConfidenceMetrics(Record): - score = Float() - reasoning = String() - retry_count = Integer() - -class ExecutionStep(Record): - id = String() - function = String() - arguments = Map(String()) - dependencies = Array(String()) - confidence_threshold = Float() - timeout_ms = Integer() - -class ExecutionPlan(Record): - id = String() - steps = Array(ExecutionStep) - context = Map(String()) - -class StepResult(Record): - step_id = String() - success = Boolean() - output = String() - confidence = ConfidenceMetrics() - execution_time_ms = Integer() - -class ConfidenceAgentRequest(Record): - question = String() - confidence_threshold = Float() - max_retries = Integer() - override_enabled = Boolean() - context = Map(String()) - -class ConfidenceAgentResponse(Record): - answer = String() - plan = ExecutionPlan() - results = Array(StepResult) - final_confidence = Float() - audit_trail = String() - error = Error() -``` +**ExecutionStep** +- `id`: String - Unique step identifier +- `function`: String - Tool/function to execute +- `arguments`: Map(String) - Arguments for the function +- `dependencies`: Array(String) - IDs of prerequisite steps +- `confidence_threshold`: Float - Minimum acceptable confidence +- `timeout_ms`: Integer - Execution timeout + +**ExecutionPlan** +- `id`: String - Plan identifier +- `steps`: Array(ExecutionStep) - Ordered execution steps +- `context`: Map(String) - Global context for plan + +**StepResult** +- `step_id`: String - Reference to ExecutionStep +- `success`: Boolean - Execution success status +- `output`: String - Step execution output +- `confidence`: ConfidenceMetrics - Confidence evaluation +- `execution_time_ms`: Integer - Actual execution time + +**ConfidenceAgentRequest** +- `question`: String - User query +- `confidence_threshold`: Float - Global confidence threshold +- `max_retries`: Integer - Maximum retry attempts +- `override_enabled`: Boolean - Allow user overrides +- `context`: Map(String) - Request context + +**ConfidenceAgentResponse** +- `answer`: String - Final answer to user +- `plan`: ExecutionPlan - Generated execution plan +- `results`: Array(StepResult) - All step results +- `final_confidence`: Float - Overall confidence score +- `audit_trail`: String - Reference to detailed audit log +- `error`: Error - Error details if failed ### 3. Module Implementation Details #### 3.1 Planner Module (`planner.py`) -```python -class PlannerModule: - """ - Generates structured execution plans from user requests. - Uses LLM to create confidence-scored step sequences. - """ - - def __init__(self, llm_client, tool_registry): - self.llm_client = llm_client - self.tool_registry = tool_registry - - async def generate_plan( - self, - request: str, - context: dict, - available_tools: list - ) -> ExecutionPlan: - """ - Generate execution plan with confidence thresholds. - - Returns: - ExecutionPlan with steps, dependencies, and confidence scores - """ - # Implementation details... -``` +The Planner Module generates structured execution plans from user requests using an LLM to create confidence-scored step sequences. **Key Responsibilities:** - Parse user requests into structured plans @@ -152,33 +124,13 @@ class PlannerModule: #### 3.2 Flow Controller (`flow_controller.py`) -```python -class FlowController: - """ - Orchestrates plan execution with confidence-based control flow. - """ - - def __init__(self, executor, memory_manager, confidence_evaluator): - self.executor = executor - self.memory = memory_manager - self.evaluator = confidence_evaluator - - async def execute_plan( - self, - plan: ExecutionPlan, - config: dict - ) -> list[StepResult]: - """ - Execute plan with confidence threshold enforcement. - - Implements: - - Step dependency resolution - - Confidence-based retry logic - - User override handling - - Graceful failure modes - """ - # Implementation details... -``` +The Flow Controller orchestrates plan execution with confidence-based control flow, managing step dependencies and retry logic. + +**Key Capabilities:** +- Step dependency resolution +- Confidence-based retry logic +- User override handling +- Graceful failure modes **Configuration Schema:** ```yaml @@ -193,30 +145,7 @@ confidence_agent: #### 3.3 Confidence Evaluator (`confidence.py`) -```python -class ConfidenceEvaluator: - """ - Evaluates confidence scores for execution results. - """ - - async def evaluate_result( - self, - function_name: str, - input_args: dict, - output: any, - execution_context: dict - ) -> ConfidenceMetrics: - """ - Calculate confidence score for a step result. - - Factors: - - Output completeness - - Semantic consistency - - Historical success rates - - Function-specific rules - """ - # Implementation details... -``` +The Confidence Evaluator calculates confidence scores for execution results based on multiple factors to ensure reliability. **Confidence Scoring Factors:** - Graph query result size and consistency @@ -226,32 +155,7 @@ class ConfidenceEvaluator: #### 3.4 Memory Manager (`memory.py`) -```python -class MemoryManager: - """ - Manages inter-step data flow and context preservation. - """ - - def __init__(self, max_context_size: int = 8192): - self.max_context_size = max_context_size - self.step_outputs = {} - self.global_context = {} - - async def store_result( - self, - step_id: str, - result: any, - metadata: dict - ): - """Store step result with intelligent pruning.""" - - async def get_context_for_step( - self, - step: ExecutionStep, - dependencies: list[str] - ) -> dict: - """Build context for step execution.""" -``` +The Memory Manager handles inter-step data flow and context preservation, ensuring efficient memory usage while maintaining necessary state. **Memory Strategies:** - Selective context passing based on dependencies @@ -261,30 +165,7 @@ class MemoryManager: #### 3.5 Executor Module (`executor.py`) -```python -class StepExecutor: - """ - Executes individual plan steps using registered tools. - """ - - def __init__(self, tool_clients: dict): - self.tool_clients = tool_clients - - async def execute_step( - self, - step: ExecutionStep, - context: dict - ) -> StepResult: - """ - Execute a single step with appropriate tool. - - Handles: - - Tool selection and invocation - - Error handling and timeouts - - Result transformation - """ - # Implementation details... -``` +The Step Executor handles individual plan step execution using registered tools, managing tool selection, error handling, and result transformation. **Tool Mapping:** - `GraphQuery` → GraphRagClient @@ -294,60 +175,23 @@ class StepExecutor: #### 3.6 Service Implementation (`service.py`) -```python -class ConfidenceAgentService(AgentService): - """ - Main service class for confidence-based agent. - """ - - def __init__(self, **params): - super().__init__(**params) - - # Initialize modules - self.planner = PlannerModule(...) - self.flow_controller = FlowController(...) - self.executor = StepExecutor(...) - - # Register client specifications - self.register_specification(TextCompletionClientSpec(...)) - self.register_specification(GraphRagClientSpec(...)) - self.register_specification(ToolClientSpec(...)) - - async def handle_request( - self, - request: ConfidenceAgentRequest - ) -> ConfidenceAgentResponse: - """ - Main request handler implementing confidence-based flow. - """ - # 1. Generate plan - plan = await self.planner.generate_plan(...) - - # 2. Execute with confidence control - results = await self.flow_controller.execute_plan(...) - - # 3. Generate response - return ConfidenceAgentResponse(...) -``` +The main service class coordinates all confidence agent components and handles request/response flow through the Pulsar message bus. + +**Service Workflow:** +1. Generate execution plan via Planner Module +2. Execute plan with confidence control via Flow Controller +3. Generate response with confidence metrics and audit trail + +**Client Specifications:** +- TextCompletionClientSpec for LLM operations +- GraphRagClientSpec for knowledge graph queries +- ToolClientSpec for MCP tool invocations ### 4. Integration Points #### 4.1 Gateway Integration -New dispatcher in `trustgraph-flow/trustgraph/gateway/dispatch/agent_confidence.py`: - -```python -from ... schema import ConfidenceAgentRequest, ConfidenceAgentResponse -from . requestor import ServiceRequestor - -class ConfidenceAgentRequestor(ServiceRequestor): - def __init__(self, pulsar_client, request_queue, response_queue, ...): - super().__init__( - request_schema=ConfidenceAgentRequest, - response_schema=ConfidenceAgentResponse, - ... - ) -``` +A new dispatcher will be created in `trustgraph-flow/trustgraph/gateway/dispatch/agent_confidence.py` that extends the existing ServiceRequestor pattern, using the ConfidenceAgentRequest and ConfidenceAgentResponse schemas for Pulsar message serialization. #### 4.2 Configuration Integration @@ -417,25 +261,13 @@ sequenceDiagram #### 5.2 Confidence-Based Control Flow -```python -async def execute_with_confidence(step, context, config): - retries = 0 - while retries < config.max_retries: - result = await executor.execute_step(step, context) - confidence = await evaluator.evaluate_result(result) - - if confidence.score >= step.confidence_threshold: - return result - - retries += 1 - await asyncio.sleep(config.retry_backoff_factor ** retries) - - # Max retries exceeded - if config.override_enabled: - return await request_user_override(step, result) - else: - raise LowConfidenceError(step, confidence) -``` +The control flow implements a retry loop with exponential backoff: + +1. Execute step and evaluate confidence +2. If confidence meets threshold, proceed to next step +3. If below threshold, retry with backoff delay +4. After max retries, either request user override or fail +5. Log all attempts and decisions for audit trail ### 6. Monitoring and Observability @@ -443,27 +275,18 @@ async def execute_with_confidence(step, context, config): New metrics to expose via Prometheus: -```python -# Confidence metrics -confidence_score_histogram = Histogram( - 'agent_confidence_score', - 'Confidence scores distribution', - buckets=[0.1, 0.3, 0.5, 0.7, 0.9, 1.0] -) +**Confidence Metrics:** +- `agent_confidence_score` - Histogram of confidence scores with buckets [0.1, 0.3, 0.5, 0.7, 0.9, 1.0] +- `agent_confidence_failures` - Counter of steps failing confidence thresholds -# Retry metrics -retry_count_counter = Counter( - 'agent_retry_count', - 'Number of retries by function', - ['function_name'] -) +**Retry Metrics:** +- `agent_retry_count` - Counter of retries by function name +- `agent_retry_success_rate` - Gauge of retry success percentage -# Plan execution metrics -plan_execution_duration = Histogram( - 'agent_plan_execution_seconds', - 'Plan execution duration' -) -``` +**Plan Execution Metrics:** +- `agent_plan_execution_seconds` - Histogram of total plan execution time +- `agent_step_execution_seconds` - Histogram of individual step execution time +- `agent_plan_complexity` - Histogram of number of steps per plan #### 6.2 Audit Trail @@ -507,48 +330,31 @@ Structured audit logging format: Location: `tests/unit/test_agent/test_confidence/` -```python -# test_planner.py -async def test_plan_generation(): - planner = PlannerModule(mock_llm, mock_tools) - plan = await planner.generate_plan("Extract entities from document") - assert len(plan.steps) > 0 - assert all(s.confidence_threshold > 0 for s in plan.steps) - -# test_confidence.py -async def test_confidence_evaluation(): - evaluator = ConfidenceEvaluator() - metrics = await evaluator.evaluate_result(...) - assert 0 <= metrics.score <= 1 -``` +**Test Coverage Areas:** +- Plan generation with various request types +- Confidence score calculation and validation +- Memory manager context handling +- Flow controller retry logic +- Executor tool mapping and error handling #### 7.2 Integration Tests Location: `tests/integration/test_agent_confidence/` -```python -async def test_end_to_end_confidence_flow(): - # Test complete flow with mock services - request = ConfidenceAgentRequest( - question="What entities are in the knowledge graph?", - confidence_threshold=0.7 - ) - response = await agent.handle_request(request) - assert response.final_confidence >= 0.7 -``` +**Test Scenarios:** +- End-to-end confidence flow with mock services +- Multi-step plan execution with dependencies +- Retry behavior under various confidence scores +- User override flow simulation +- Fallback to ReAct agent on failure #### 7.3 Contract Tests -Ensure compatibility with existing service contracts: - -```python -async def test_pulsar_message_compatibility(): - # Verify schema serialization/deserialization - request = ConfidenceAgentRequest(...) - serialized = schema.encode(request) - deserialized = schema.decode(serialized) - assert request == deserialized -``` +**Contract Validation:** +- Pulsar message schema serialization/deserialization +- Compatibility with existing tool service interfaces +- Gateway dispatcher protocol compliance +- Response format consistency with ReAct agent where applicable ### 8. Migration and Rollout @@ -752,4 +558,4 @@ config: "final_confidence": 0.92, "audit_trail": "execution-log-url" } -``` \ No newline at end of file +```