mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-06-08 15:05:17 +02:00
update roles ut
This commit is contained in:
parent
8fe00a1788
commit
e8ab02b203
4 changed files with 726 additions and 35 deletions
|
|
@ -1,21 +1,143 @@
|
|||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from metagpt.const import TEST_DATA_PATH
|
||||
from metagpt.actions.di.execute_nb_code import ExecuteNbCode
|
||||
from metagpt.actions.di.write_analysis_code import WriteAnalysisCode
|
||||
from metagpt.roles.di.data_analyst import DataAnalyst
|
||||
from metagpt.roles.di.role_zero import RoleZero
|
||||
from metagpt.strategy.task_type import TaskType
|
||||
from metagpt.tools.tool_recommend import BM25ToolRecommender
|
||||
|
||||
|
||||
@pytest.mark.skip
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
("query", "filename"), [("similarity search about '有哪些需求描述?' in document ", TEST_DATA_PATH / "requirements/2.pdf")]
|
||||
)
|
||||
async def test_similarity_search(query, filename):
|
||||
di = DataAnalyst()
|
||||
query += f"'{str(filename)}'"
|
||||
|
||||
rsp = await di.run(query)
|
||||
assert rsp
|
||||
@pytest.fixture
|
||||
def data_analyst():
|
||||
analyst = DataAnalyst()
|
||||
analyst.planner = MagicMock()
|
||||
analyst.planner.plan = MagicMock()
|
||||
analyst.rc = MagicMock()
|
||||
analyst.rc.working_memory = MagicMock()
|
||||
analyst.rc.memory = MagicMock()
|
||||
return analyst
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-s"])
|
||||
@pytest.fixture
|
||||
def mock_execute_code():
|
||||
with patch('metagpt.actions.di.execute_nb_code.ExecuteNbCode') as mock:
|
||||
instance = mock.return_value
|
||||
instance.init_code = AsyncMock()
|
||||
instance.run = AsyncMock()
|
||||
yield instance
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_write_code():
|
||||
with patch('metagpt.actions.di.write_analysis_code.WriteAnalysisCode') as mock:
|
||||
instance = mock.return_value
|
||||
instance.run = AsyncMock()
|
||||
yield instance
|
||||
|
||||
|
||||
class TestDataAnalyst:
|
||||
def test_init(self):
|
||||
analyst = DataAnalyst()
|
||||
assert analyst.name == "David"
|
||||
assert analyst.profile == "DataAnalyst"
|
||||
assert "Browser" in analyst.tools
|
||||
assert isinstance(analyst.write_code, WriteAnalysisCode)
|
||||
assert isinstance(analyst.execute_code, ExecuteNbCode)
|
||||
|
||||
def test_set_custom_tool(self):
|
||||
# 测试有自定义工具的情况
|
||||
analyst = DataAnalyst()
|
||||
analyst.custom_tools = ["web scraping", "Terminal"]
|
||||
analyst.custom_tool_recommender = None # 确保初始值为None
|
||||
analyst.set_custom_tool()
|
||||
assert isinstance(analyst.custom_tool_recommender, BM25ToolRecommender)
|
||||
|
||||
# 测试没有自定义工具的情况
|
||||
analyst = DataAnalyst()
|
||||
analyst.custom_tools = []
|
||||
analyst.custom_tool_recommender = BM25ToolRecommender(tools=["some_tool"], force=True) # 设置一个初始值
|
||||
analyst.set_custom_tool()
|
||||
assert isinstance(analyst.custom_tool_recommender, BM25ToolRecommender) # 验证即使没有自定义工具,现有的推荐器也保持不变
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_and_exec_code_no_task(self, data_analyst):
|
||||
data_analyst.planner.current_task = None
|
||||
result = await data_analyst.write_and_exec_code()
|
||||
assert "No current_task found" in result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_and_exec_code_success(self, data_analyst, mock_execute_code, mock_write_code):
|
||||
# Setup mocks
|
||||
data_analyst.planner.current_task = MagicMock()
|
||||
data_analyst.planner.get_plan_status.return_value = "Plan status"
|
||||
data_analyst.custom_tool_recommender = MagicMock()
|
||||
data_analyst.custom_tool_recommender.get_recommended_tool_info = AsyncMock(return_value="Tool info")
|
||||
|
||||
mock_write_code.run.return_value = "test code"
|
||||
mock_execute_code.run.return_value = ("Success result", True)
|
||||
|
||||
result = await data_analyst.write_and_exec_code("test instruction")
|
||||
|
||||
assert "Success" in result
|
||||
assert mock_execute_code.init_code.called
|
||||
assert mock_write_code.run.called
|
||||
data_analyst.rc.working_memory.add.assert_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_and_exec_code_failure(self, data_analyst, mock_execute_code, mock_write_code):
|
||||
# Setup mocks
|
||||
data_analyst.planner.current_task = MagicMock()
|
||||
data_analyst.planner.get_plan_status.return_value = "Plan status"
|
||||
data_analyst.custom_tool_recommender = None
|
||||
|
||||
mock_write_code.run.return_value = "test code"
|
||||
mock_execute_code.run.return_value = ("Failed result", False)
|
||||
|
||||
result = await data_analyst.write_and_exec_code()
|
||||
|
||||
assert "Failed" in result
|
||||
assert mock_execute_code.run.call_count == 3 # Should retry 3 times
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_data_no_tasks(self, data_analyst):
|
||||
data_analyst.planner.plan.get_finished_tasks.return_value = []
|
||||
await data_analyst._check_data()
|
||||
assert not data_analyst.rc.working_memory.add.called
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_data_with_data_task(self, data_analyst, mock_execute_code):
|
||||
# Setup task with DATA_PREPROCESS type
|
||||
task = MagicMock()
|
||||
task.task_type = TaskType.DATA_PREPROCESS.type_name
|
||||
data_analyst.planner.plan.get_finished_tasks.return_value = [task]
|
||||
data_analyst.planner.plan.current_task = task
|
||||
|
||||
with patch('metagpt.actions.di.write_analysis_code.CheckData') as mock_check:
|
||||
mock_check.return_value.run = AsyncMock(return_value="check code")
|
||||
mock_execute_code.run.return_value = ("check result", True)
|
||||
|
||||
await data_analyst._check_data()
|
||||
|
||||
assert mock_check.return_value.run.called
|
||||
assert mock_execute_code.run.called
|
||||
data_analyst.rc.working_memory.add.assert_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_special_command(self, data_analyst):
|
||||
data_analyst.planner.plan.is_plan_finished.return_value = False
|
||||
|
||||
cmd = {"command_name": "end"}
|
||||
with patch.object(RoleZero, '_run_special_command', return_value="base result"):
|
||||
result = await data_analyst._run_special_command(cmd)
|
||||
|
||||
assert "All tasks are finished" in result
|
||||
assert data_analyst.planner.plan.finish_all_tasks.called
|
||||
|
||||
# Test non-end command
|
||||
cmd = {"command_name": "other"}
|
||||
with patch.object(RoleZero, '_run_special_command', return_value="base result"):
|
||||
result = await data_analyst._run_special_command(cmd)
|
||||
assert result == "base result"
|
||||
|
|
|
|||
265
tests/metagpt/roles/di/test_role_zero.py
Normal file
265
tests/metagpt/roles/di/test_role_zero.py
Normal file
|
|
@ -0,0 +1,265 @@
|
|||
from typing import List
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from metagpt.actions import UserRequirement
|
||||
from metagpt.roles.di.role_zero import RoleZero
|
||||
from metagpt.schema import Message, UserMessage, AIMessage
|
||||
from metagpt.tools.libs.browser import Browser
|
||||
|
||||
|
||||
class MockConfig:
|
||||
"""Mock configuration for RoleZero testing"""
|
||||
|
||||
class RoleZeroConfig:
|
||||
enable_longterm_memory = True
|
||||
longterm_memory_persist_path = "/tmp/test_memory"
|
||||
memory_k = 5
|
||||
similarity_top_k = 3
|
||||
use_llm_ranker = False
|
||||
|
||||
role_zero = RoleZeroConfig()
|
||||
|
||||
|
||||
class MockLLM:
|
||||
"""Mock LLM for testing"""
|
||||
|
||||
def __init__(self, responses: List[str] = None):
|
||||
self.responses = responses or ["Mock LLM Response"]
|
||||
self.response_index = 0
|
||||
|
||||
async def aask(self, *args, **kwargs):
|
||||
response = self.responses[self.response_index]
|
||||
self.response_index = (self.response_index + 1) % len(self.responses)
|
||||
return response
|
||||
|
||||
def support_image_input(self):
|
||||
return True
|
||||
|
||||
def format_msg(self, msgs):
|
||||
return msgs
|
||||
|
||||
|
||||
class MockToolRecommender:
|
||||
"""Mock tool recommender for testing"""
|
||||
|
||||
async def recommend_tools(self):
|
||||
return []
|
||||
|
||||
|
||||
class MockMemory:
|
||||
"""Mock memory for testing"""
|
||||
|
||||
def add(self, msg):
|
||||
pass
|
||||
|
||||
def get(self, k=None):
|
||||
return []
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_role_zero():
|
||||
"""Fixture providing a configured RoleZero instance for testing"""
|
||||
role = RoleZero()
|
||||
role.llm = MockLLM()
|
||||
role.config = MockConfig()
|
||||
role.tool_recommender = MockToolRecommender()
|
||||
role.rc.working_memory = MockMemory()
|
||||
role.rc.memory = MockMemory()
|
||||
return role
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_message():
|
||||
"""Fixture providing a test message"""
|
||||
return Message(content="Test message", role="user")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_model_validators(mock_role_zero):
|
||||
"""Test all model validators"""
|
||||
# Test set_plan_and_tool
|
||||
assert mock_role_zero.react_mode == "react"
|
||||
mock_role_zero = await mock_role_zero.set_plan_and_tool()
|
||||
assert mock_role_zero.planner is not None
|
||||
|
||||
# Test set_tool_execution
|
||||
mock_role_zero = await mock_role_zero.set_tool_execution()
|
||||
assert "Plan.append_task" in mock_role_zero.tool_execution_map
|
||||
assert "RoleZero.ask_human" in mock_role_zero.tool_execution_map
|
||||
|
||||
# Test set_longterm_memory
|
||||
mock_role_zero = await mock_role_zero.set_longterm_memory()
|
||||
assert mock_role_zero.rc.memory is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_think_react_cycle(mock_role_zero):
|
||||
"""Test the think-react cycle"""
|
||||
# Setup test conditions
|
||||
mock_role_zero.rc.todo = True
|
||||
mock_role_zero.planner.plan.goal = "Test goal"
|
||||
mock_role_zero.respond_language = "English"
|
||||
|
||||
# Test _think
|
||||
with patch('metagpt.roles.di.role_zero.ThoughtReporter'):
|
||||
result = await mock_role_zero._think()
|
||||
assert result is True
|
||||
|
||||
# Test _react
|
||||
mock_role_zero.rc.news = [Message(content="Test", cause_by=UserRequirement())]
|
||||
with patch.object(mock_role_zero, '_quick_think', return_value=(None, "TASK")):
|
||||
result = await mock_role_zero._react()
|
||||
assert isinstance(result, Message)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_command_parsing(mock_role_zero):
|
||||
"""Test command parsing functionality"""
|
||||
# Test valid JSON parsing
|
||||
valid_commands = '''[
|
||||
{"command_name": "Editor.read", "args": {"filename": "test.txt"}},
|
||||
{"command_name": "Plan.finish_current_task", "args": {}}
|
||||
]'''
|
||||
commands, ok, rsp = await mock_role_zero._parse_commands(valid_commands)
|
||||
assert ok is True
|
||||
assert len(commands) == 2
|
||||
|
||||
# Test invalid JSON
|
||||
invalid_commands = "Invalid JSON"
|
||||
with patch.object(mock_role_zero.llm, 'aask') as mock_aask:
|
||||
mock_aask.return_value = valid_commands
|
||||
commands, ok, rsp = await mock_role_zero._parse_commands(invalid_commands)
|
||||
assert ok is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_command_execution(mock_role_zero):
|
||||
"""Test command execution"""
|
||||
# Test special commands
|
||||
special_commands = [
|
||||
{"command_name": "Plan.finish_current_task", "args": {}},
|
||||
{"command_name": "end", "args": {}}
|
||||
]
|
||||
|
||||
with patch.object(mock_role_zero, '_run_special_command') as mock_special:
|
||||
mock_special.return_value = "Special command executed"
|
||||
result = await mock_role_zero._run_commands(special_commands)
|
||||
assert "Command Plan.finish_current_task executed" in result
|
||||
|
||||
# Test normal commands
|
||||
normal_commands = [
|
||||
{"command_name": "Editor.read", "args": {"filename": "test.txt"}}
|
||||
]
|
||||
with patch.object(mock_role_zero.editor, 'read', return_value="File content"):
|
||||
result = await mock_role_zero._run_commands(normal_commands)
|
||||
assert "Command Editor.read executed" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_message_handling(mock_role_zero):
|
||||
"""Test message parsing and handling"""
|
||||
# Test browser action parsing
|
||||
mock_browser = AsyncMock(spec=Browser)
|
||||
mock_browser.is_empty_page = False
|
||||
mock_browser.view.return_value = "Browser content"
|
||||
mock_role_zero.browser = mock_browser
|
||||
|
||||
browser_memory = [
|
||||
UserMessage(content="Command Browser.goto executed"),
|
||||
UserMessage(content="Other message")
|
||||
]
|
||||
result = await mock_role_zero.parse_browser_actions(browser_memory)
|
||||
assert len(result) == 3
|
||||
|
||||
# Test editor result parsing
|
||||
editor_memory = [
|
||||
UserMessage(content="Command Editor.read executed: content"),
|
||||
UserMessage(content="Normal message")
|
||||
]
|
||||
result = await mock_role_zero.parse_editor_result(editor_memory)
|
||||
assert len(result) == 2
|
||||
|
||||
# Test image parsing
|
||||
image_memory = [
|
||||
UserMessage(content="Message with "),
|
||||
UserMessage(content="Normal message")
|
||||
]
|
||||
result = mock_role_zero.parse_images(image_memory)
|
||||
assert len(result) == 2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_error_cases(mock_role_zero):
|
||||
"""Test error handling in various scenarios"""
|
||||
# Test invalid command execution
|
||||
invalid_commands = [
|
||||
{"command_name": "NonExistentCommand", "args": {}}
|
||||
]
|
||||
result = await mock_role_zero._run_commands(invalid_commands)
|
||||
assert "Command NonExistentCommand not found" in result
|
||||
|
||||
# Test command parsing with malformed JSON
|
||||
malformed_json = '[{"command_name": "test", "args": {}]' # Missing closing brace
|
||||
with patch.object(mock_role_zero.llm, 'aask') as mock_aask:
|
||||
mock_aask.return_value = '[{"command_name": "fixed", "args": {}}]' # Valid JSON response
|
||||
commands, ok, rsp = await mock_role_zero._parse_commands(malformed_json)
|
||||
assert ok is True
|
||||
|
||||
# Test command parsing with improper command structure
|
||||
invalid_format = '[{"not_a_command": true}]' # Valid JSON but wrong format
|
||||
with patch.object(mock_role_zero.llm, 'aask') as mock_aask:
|
||||
mock_aask.return_value = invalid_format
|
||||
commands, ok, rsp = await mock_role_zero._parse_commands(invalid_format)
|
||||
assert ok is False
|
||||
|
||||
# Test think with no todo
|
||||
mock_role_zero.rc.todo = False
|
||||
result = await mock_role_zero._think()
|
||||
assert result is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_special_commands(mock_role_zero):
|
||||
"""Test special command handling"""
|
||||
# Test Plan.finish_current_task
|
||||
finish_command = {"command_name": "Plan.finish_current_task", "args": {}}
|
||||
result = await mock_role_zero._run_special_command(finish_command)
|
||||
assert "Current task is finished" in result
|
||||
|
||||
# Test end command
|
||||
end_command = {"command_name": "end", "args": {}}
|
||||
with patch.object(mock_role_zero.llm, 'aask', return_value="Summary"):
|
||||
result = await mock_role_zero._run_special_command(end_command)
|
||||
assert result
|
||||
|
||||
# Test ask_human command
|
||||
ask_command = {"command_name": "RoleZero.ask_human", "args": {"question": "Test?"}}
|
||||
result = await mock_role_zero._run_special_command(ask_command)
|
||||
assert "Not in MGXEnv" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_quick_think(mock_role_zero):
|
||||
"""Test quick think functionality"""
|
||||
mock_role_zero.rc.news = [Message(content="Test", cause_by=UserRequirement())]
|
||||
|
||||
with patch.object(mock_role_zero.llm, 'aask') as mock_aask:
|
||||
mock_aask.side_effect = ["QUICK", "Quick response"]
|
||||
result, intent = await mock_role_zero._quick_think()
|
||||
assert isinstance(result, AIMessage)
|
||||
assert intent == "QUICK"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_experience_retrieval(mock_role_zero):
|
||||
"""Test experience retrieval functionality"""
|
||||
# Test with empty memory
|
||||
result = mock_role_zero._retrieve_experience()
|
||||
assert isinstance(result, str)
|
||||
|
||||
# Test with mock experience retriever
|
||||
mock_role_zero.experience_retriever.retrieve = MagicMock(return_value="Test experience")
|
||||
result = mock_role_zero._retrieve_experience()
|
||||
assert result == "Test experience"
|
||||
143
tests/metagpt/roles/di/test_swe_agent.py
Normal file
143
tests/metagpt/roles/di/test_swe_agent.py
Normal file
|
|
@ -0,0 +1,143 @@
|
|||
import json
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
from metagpt.roles.di.swe_agent import SWEAgent
|
||||
from metagpt.schema import Message
|
||||
from metagpt.tools.libs.terminal import Bash
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_terminal():
|
||||
terminal = AsyncMock(spec=Bash)
|
||||
terminal.run = AsyncMock()
|
||||
return terminal
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_extract_patch():
|
||||
with patch('metagpt.tools.swe_agent_commands.swe_agent_utils.extract_patch') as mock:
|
||||
mock.return_value = 'test_patch'
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def swe_agent(mock_terminal):
|
||||
agent = SWEAgent()
|
||||
agent.terminal = mock_terminal
|
||||
# Mock super()._think and super()._act
|
||||
agent._think = AsyncMock(return_value=True)
|
||||
agent._act = AsyncMock(return_value=Message(content='test'))
|
||||
return agent
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_initialization():
|
||||
"""Test SWEAgent initialization and attributes"""
|
||||
agent = SWEAgent()
|
||||
assert agent.name == 'Swen'
|
||||
assert agent.profile == 'Issue Solver'
|
||||
assert isinstance(agent.terminal, Bash)
|
||||
assert agent.output_diff == ''
|
||||
assert agent.max_react_loop == 40
|
||||
assert agent.run_eval is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_think(swe_agent):
|
||||
"""Test _think method with mocked dependencies"""
|
||||
# Mock _format_instruction
|
||||
swe_agent._format_instruction = AsyncMock()
|
||||
|
||||
result = await swe_agent._think()
|
||||
assert result is True
|
||||
swe_agent._format_instruction.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_format_instruction(swe_agent):
|
||||
"""Test _format_instruction with mocked terminal response"""
|
||||
mock_state = {"key": "value"}
|
||||
swe_agent.terminal.run.return_value = json.dumps(mock_state)
|
||||
|
||||
await swe_agent._format_instruction()
|
||||
swe_agent.terminal.run.assert_called_with('state')
|
||||
assert isinstance(swe_agent.cmd_prompt_current_state, str)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_format_instruction_error(swe_agent):
|
||||
"""Test _format_instruction with invalid JSON response"""
|
||||
swe_agent.terminal.run.return_value = 'invalid json'
|
||||
|
||||
with pytest.raises(json.JSONDecodeError):
|
||||
await swe_agent._format_instruction()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_act_with_eval(swe_agent):
|
||||
"""Test _act method with run_eval=True"""
|
||||
swe_agent.run_eval = True
|
||||
swe_agent._parse_commands_for_eval = AsyncMock()
|
||||
|
||||
result = await swe_agent._act()
|
||||
assert isinstance(result, Message)
|
||||
swe_agent._parse_commands_for_eval.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_act_without_eval(swe_agent):
|
||||
"""Test _act method with run_eval=False"""
|
||||
swe_agent.run_eval = False
|
||||
swe_agent._parse_commands_for_eval = AsyncMock()
|
||||
|
||||
result = await swe_agent._act()
|
||||
assert isinstance(result, Message)
|
||||
swe_agent._parse_commands_for_eval.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_commands_for_eval_with_diff(swe_agent, mock_extract_patch):
|
||||
"""Test _parse_commands_for_eval with git diff output"""
|
||||
swe_agent.rc.todo = False
|
||||
swe_agent.terminal.run.return_value = 'test diff output'
|
||||
|
||||
await swe_agent._parse_commands_for_eval()
|
||||
assert swe_agent.output_diff == 'test_patch'
|
||||
mock_extract_patch.assert_called_with('test diff output')
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_commands_for_eval_with_error(swe_agent):
|
||||
"""Test _parse_commands_for_eval error handling"""
|
||||
swe_agent.rc.todo = False
|
||||
swe_agent.terminal.run.side_effect = Exception('test error')
|
||||
|
||||
await swe_agent._parse_commands_for_eval()
|
||||
assert swe_agent.output_diff == ''
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_commands_for_eval_with_todo(swe_agent):
|
||||
"""Test _parse_commands_for_eval when todo is True"""
|
||||
swe_agent.rc.todo = True
|
||||
|
||||
await swe_agent._parse_commands_for_eval()
|
||||
swe_agent.terminal.run.assert_not_called()
|
||||
|
||||
|
||||
def test_retrieve_experience(swe_agent):
|
||||
"""Test _retrieve_experience returns MINIMAL_EXAMPLE"""
|
||||
from metagpt.prompts.di.swe_agent import MINIMAL_EXAMPLE
|
||||
|
||||
result = swe_agent._retrieve_experience()
|
||||
assert result == MINIMAL_EXAMPLE
|
||||
|
||||
|
||||
def test_update_tool_execution(swe_agent):
|
||||
"""Test _update_tool_execution adds required tools"""
|
||||
swe_agent._update_tool_execution()
|
||||
|
||||
assert 'Bash.run' in swe_agent.tool_execution_map
|
||||
assert 'git_create_pull' in swe_agent.tool_execution_map
|
||||
assert swe_agent.tool_execution_map['Bash.run'] == swe_agent.terminal.run
|
||||
|
|
@ -7,56 +7,217 @@
|
|||
"""
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
import json
|
||||
|
||||
import pytest
|
||||
from pydantic import Field
|
||||
from pydantic import BaseModel, Field
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
from metagpt.actions import DebugError, RunCode, WriteTest
|
||||
from metagpt.actions import DebugError, RunCode, WriteTest, UserRequirement
|
||||
from metagpt.actions.summarize_code import SummarizeCode
|
||||
from metagpt.actions.prepare_documents import PrepareDocuments
|
||||
from metagpt.environment import Environment
|
||||
from metagpt.roles import QaEngineer
|
||||
from metagpt.schema import Message
|
||||
from metagpt.schema import Message, AIMessage, Document, RunCodeContext, TestingContext
|
||||
from metagpt.utils.common import any_to_str, aread, awrite
|
||||
from metagpt.utils.project_repo import ProjectRepo
|
||||
|
||||
|
||||
async def test_qa(context):
|
||||
# Prerequisites
|
||||
demo_path = Path(__file__).parent / "../../data/demo_project"
|
||||
context.src_workspace = Path(context.repo.workdir) / "qa/game_2048"
|
||||
data = await aread(filename=demo_path / "game.py", encoding="utf-8")
|
||||
await awrite(filename=context.src_workspace / "game.py", data=data, encoding="utf-8")
|
||||
await awrite(filename=Path(context.repo.workdir) / "requirements.txt", data="")
|
||||
class MockProjectRepo:
|
||||
def __init__(self, workdir):
|
||||
self.workdir = Path(workdir)
|
||||
self.srcs = MockRepoFiles(self.workdir / "src")
|
||||
self.tests = MockRepoFiles(self.workdir / "tests")
|
||||
self.test_outputs = MockRepoFiles(self.workdir / "test_outputs")
|
||||
self.src_relative_path = None
|
||||
|
||||
class MockEnv(Environment):
|
||||
msgs: List[Message] = Field(default_factory=list)
|
||||
def with_src_path(self, path):
|
||||
self.src_relative_path = path
|
||||
return self
|
||||
|
||||
def publish_message(self, message: Message, peekable: bool = True) -> bool:
|
||||
self.msgs.append(message)
|
||||
return True
|
||||
|
||||
class MockRepoFiles:
|
||||
def __init__(self, workdir):
|
||||
self.workdir = Path(workdir)
|
||||
self.root_path = workdir
|
||||
self.changed_files = {}
|
||||
|
||||
async def get(self, filename):
|
||||
if filename.endswith(".py"):
|
||||
return Document(root_path=str(self.root_path), filename=filename, content="def test(): pass")
|
||||
return None
|
||||
|
||||
async def save(self, filename, content, dependencies=None):
|
||||
self.changed_files[filename] = content
|
||||
return Document(root_path=str(self.root_path), filename=filename, content=content)
|
||||
|
||||
async def save_doc(self, doc, dependencies=None):
|
||||
self.changed_files[doc.filename] = doc.content
|
||||
return doc
|
||||
|
||||
|
||||
class MockContext(BaseModel):
|
||||
reqa_file: str = None
|
||||
src_workspace: Path = None
|
||||
git_repo: BaseModel = None
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
|
||||
class MockGitRepo:
|
||||
def __init__(self, workdir):
|
||||
self.workdir = workdir
|
||||
self.name = "test_project"
|
||||
|
||||
|
||||
class MockEnv(Environment):
|
||||
msgs: List[Message] = Field(default_factory=list)
|
||||
|
||||
def publish_message(self, message: Message, peekable: bool = True) -> bool:
|
||||
self.msgs.append(message)
|
||||
return True
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_env():
|
||||
return MockEnv()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_repo(tmp_path):
|
||||
return MockProjectRepo(tmp_path)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def qa_engineer(mock_env, mock_repo):
|
||||
role = QaEngineer()
|
||||
role.set_env(mock_env)
|
||||
role.repo = mock_repo
|
||||
return role
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_initialization():
|
||||
"""Test QaEngineer initialization"""
|
||||
role = QaEngineer()
|
||||
assert role.name == "Edward"
|
||||
assert role.profile == "QaEngineer"
|
||||
assert role.test_round == 0
|
||||
assert role.test_round_allowed == 5
|
||||
assert not role.enable_memory
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_test(qa_engineer, mock_repo):
|
||||
"""Test _write_test method"""
|
||||
context = MockContext(reqa_file="test.py")
|
||||
qa_engineer.context = context
|
||||
message = Message(content="Test content")
|
||||
|
||||
await qa_engineer._write_test(message)
|
||||
|
||||
assert "test_test.py" in mock_repo.tests.changed_files
|
||||
assert mock_repo.tests.changed_files["test_test.py"] is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_code(qa_engineer, mock_repo):
|
||||
"""Test _run_code method"""
|
||||
context = RunCodeContext(
|
||||
command=["python", "test_sample.py"],
|
||||
code_filename="sample.py",
|
||||
test_filename="test_sample.py",
|
||||
working_directory=str(mock_repo.workdir),
|
||||
)
|
||||
message = Message(content=context.model_dump_json())
|
||||
|
||||
await qa_engineer._run_code(message)
|
||||
|
||||
assert "test_sample.py.json" in mock_repo.test_outputs.changed_files
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_parse_user_requirement(qa_engineer):
|
||||
"""Test _parse_user_requirement method"""
|
||||
qa_engineer.git_repo = MockGitRepo(Path("/test/path"))
|
||||
message = Message(
|
||||
content="Create test for game.py",
|
||||
cause_by=any_to_str(UserRequirement)
|
||||
)
|
||||
|
||||
result = await qa_engineer._parse_user_requirement(message)
|
||||
assert isinstance(result, AIMessage)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_think_with_summarize_code(qa_engineer):
|
||||
"""Test _think method with SummarizeCode message"""
|
||||
|
||||
class MockArgs(BaseModel):
|
||||
project_path: str = "/test/path"
|
||||
|
||||
message = Message(
|
||||
content="Test content",
|
||||
cause_by=any_to_str(SummarizeCode),
|
||||
instruct_content=MockArgs()
|
||||
)
|
||||
qa_engineer.rc.news = [message]
|
||||
|
||||
result = await qa_engineer._think()
|
||||
assert result is True
|
||||
assert qa_engineer.input_args is not None
|
||||
assert qa_engineer.repo is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_act_exceeding_rounds(qa_engineer):
|
||||
"""Test _act method when exceeding test rounds"""
|
||||
qa_engineer.test_round = 6
|
||||
qa_engineer.input_args = BaseModel()
|
||||
|
||||
result = await qa_engineer._act()
|
||||
assert isinstance(result, AIMessage)
|
||||
assert "Exceeding" in result.content
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_qa_full_workflow(tmp_path):
|
||||
"""Test the full QA workflow"""
|
||||
# Setup mock context and environment
|
||||
git_repo = MockGitRepo(tmp_path)
|
||||
context = MockContext(src_workspace=tmp_path / "qa/game_2048", git_repo=git_repo)
|
||||
context.src_workspace.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Create mock game.py file
|
||||
await awrite(filename=context.src_workspace / "game.py", data="def test(): pass", encoding="utf-8")
|
||||
await awrite(filename=tmp_path / "requirements.txt", data="")
|
||||
|
||||
# Setup QA engineer
|
||||
env = MockEnv()
|
||||
|
||||
role = QaEngineer(context=context)
|
||||
role.set_env(env)
|
||||
role.repo = MockProjectRepo(tmp_path)
|
||||
|
||||
# Test full workflow
|
||||
await role.run(with_message=Message(content="", cause_by=SummarizeCode))
|
||||
assert env.msgs
|
||||
assert env.msgs[0].cause_by == any_to_str(WriteTest)
|
||||
|
||||
msg = env.msgs[0]
|
||||
env.msgs.clear()
|
||||
await role.run(with_message=msg)
|
||||
assert env.msgs
|
||||
assert env.msgs[0].cause_by == any_to_str(RunCode)
|
||||
|
||||
msg = env.msgs[0]
|
||||
env.msgs.clear()
|
||||
await role.run(with_message=msg)
|
||||
assert env.msgs
|
||||
assert env.msgs[0].cause_by == any_to_str(DebugError)
|
||||
|
||||
msg = env.msgs[0]
|
||||
env.msgs.clear()
|
||||
role.test_round_allowed = 1
|
||||
rsp = await role.run(with_message=msg)
|
||||
assert "Exceeding" in rsp.content
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-s"])
|
||||
assert "Exceeding" in rsp.content
|
||||
Loading…
Add table
Add a link
Reference in a new issue