Merge pull request #1669 from HuiDBK/feat_tool_ut

Feat tool ut
This commit is contained in:
better629 2025-02-12 10:52:01 +08:00 committed by GitHub
commit 6fa4cd0b5a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 377 additions and 120 deletions

View file

@ -37,7 +37,7 @@ class GPTvGenerator:
It utilizes a vision model to analyze the layout from an image and generate webpage codes accordingly.
"""
def __init__(self, config: Optional[Config]):
def __init__(self, config: Optional[Config] = None):
"""Initialize GPTvGenerator class with default values from the configuration."""
from metagpt.llm import LLM

View file

@ -1,90 +1,122 @@
from unittest.mock import AsyncMock, patch
import pytest
import pytest_asyncio
from metagpt.const import TEST_DATA_PATH
from metagpt.tools.libs.browser import Browser, get_scroll_position
from metagpt.tools.libs.browser import Browser
TEST_URL = "https://docs.deepwisdom.ai/main/en/guide/get_started/quickstart.html"
TEST_SCREENSHOT_PATH = TEST_DATA_PATH / "screenshot.png"
@pytest.fixture(autouse=True)
def llm_mock(rsp_cache, mocker, request):
# An empty fixture to overwrite the global llm_mock fixture
# because in provider folder, we want to test the aask and aask functions for the specific models
pass
@pytest.fixture
def browser():
browser_instance = Browser()
yield browser_instance
@pytest.mark.asyncio
async def test_open_and_switch_page(browser):
await browser.start()
class TestBrowser:
test_url = "https://juejin.cn/"
await browser.open_new_page("https://baidu.com")
await browser.open_new_page("https://tencent.com")
assert browser.current_page_url == "https://tencent.com"
await browser.switch_page("https://baidu.com")
assert browser.current_page_url == "https://baidu.com"
@pytest_asyncio.fixture(autouse=True)
async def browser_client(self):
"""Setup before each test case."""
print("browser_client")
browser = await self.async_setup()
yield browser
await self.browser.stop()
await browser.close()
async def async_setup(self):
self.browser = Browser(headless=True)
await self.browser.start()
return self.browser
async def async_teardown(self):
"""Teardown after each test case."""
await self.browser.stop()
@pytest.mark.asyncio
async def test_search(browser):
await browser.start()
async def test_start_and_stop(self):
"""Test browser start and stop functionality."""
assert self.browser.playwright is not None
assert self.browser.browser_instance is not None
assert self.browser.browser_ctx is not None
assert self.browser.page is not None
# search all
await browser.open_new_page(TEST_URL)
search_term = "startup example"
search_results = await browser.search_content_all(search_term)
print(search_results)
# expected search result as of 20240410:
# [{'index': 0, 'content': {'text_block': 'Below is a breakdown of the software startup example. If you install MetaGPT with the git clone approach, simply run', 'links': [{'text': 'software startup example', 'href': 'https://github.com/geekan/MetaGPT/blob/main/metagpt/software_company.py'}]}, 'position': {'from_top': 640, 'from_left': 225}, 'element_obj': <Locator frame=<Frame name= url='https://docs.deepwisdom.ai/main/en/guide/get_started/quickstart.html'> selector='text=startup example >> nth=0'>}]
first_result = search_results[0]["content"]
assert "software startup example" in first_result["text_block"]
assert first_result["links"]
assert first_result["links"][0]["href"] == "https://github.com/geekan/MetaGPT/blob/main/metagpt/software_company.py"
assert search_results[0]["position"]
await self.async_teardown()
# scroll to search result
await browser.scroll_to_search_result(search_results, index=0)
assert self.browser.playwright is None
assert self.browser.browser_instance is None
assert self.browser.browser_ctx is None
await browser.close()
async def test_goto(self):
"""Test navigating to a URL."""
mock_reporter = AsyncMock()
self.browser.reporter = mock_reporter
result = await self.browser.goto(self.test_url)
assert "SUCCESS" in result
assert self.test_url in self.browser.page.url
# @pytest.mark.asyncio
# async def test_find_links(browser):
# await browser.start()
@patch("metagpt.tools.libs.browser.click_element", new_callable=AsyncMock)
async def test_click(self, mock_click_element):
"""Test clicking on an element."""
self.browser.accessibility_tree = [
{"nodeId": "1", "backendDOMNodeId": 101, "name": "Button"},
{"nodeId": "2", "backendDOMNodeId": 102, "name": "Input"},
]
self.browser.page = AsyncMock()
# await browser.open_new_page(TEST_URL)
# link_info = await browser.find_links()
# assert link_info
await self.browser.click(1)
# await browser.close()
mock_click_element.assert_called_once()
@patch("metagpt.tools.libs.browser.click_element", new_callable=AsyncMock)
@patch("metagpt.tools.libs.browser.type_text", new_callable=AsyncMock)
async def test_type(self, mock_type_text, mock_click_element):
"""Test typing text into an input field."""
content = "Hello, world!"
self.browser.accessibility_tree = [
{"nodeId": "1", "backendDOMNodeId": 101, "name": "Button"},
{"nodeId": "2", "backendDOMNodeId": 102, "name": "Input"},
]
self.browser.page = AsyncMock()
@pytest.mark.asyncio
async def test_scroll(browser):
await browser.start()
await self.browser.type(1, content)
await browser.open_new_page(TEST_URL)
mock_click_element.assert_called_once()
mock_type_text.assert_called_once_with(self.browser.page, content)
await browser.scroll_current_page(offset=-500)
assert await get_scroll_position(browser.current_page) == {"x": 0, "y": 0} # no change if you scrol up from top
initial_view = await browser._view()
@patch("metagpt.tools.libs.browser.key_press", new_callable=AsyncMock)
@patch("metagpt.tools.libs.browser.hover_element", new_callable=AsyncMock)
async def test_hover_press(self, mock_hover_element, mock_key_press):
"""Test Hover and press key"""
self.browser.accessibility_tree = [
{"nodeId": "1", "backendDOMNodeId": 101, "name": "Button"},
{"nodeId": "2", "backendDOMNodeId": 102, "name": "Input"},
]
self.browser.page = AsyncMock()
await browser.scroll_current_page(offset=500) # scroll down
assert await get_scroll_position(browser.current_page) == {"x": 0, "y": 500}
scrolled_view = await browser._view()
key_comb = "Enter"
await self.browser.hover(1)
await self.browser.press(key_comb)
mock_hover_element.assert_called_once()
mock_key_press.assert_called_once_with(self.browser.page, key_comb)
assert initial_view != scrolled_view
async def test_scroll(self):
"""Scroll the page up or down."""
await self.browser.scroll("down")
await self.browser.scroll("up")
await browser.scroll_current_page(offset=-200) # scroll up
assert await get_scroll_position(browser.current_page) == {"x": 0, "y": 300}
async def test_go_back_and_forward(self):
await self.browser.go_back()
await self.browser.go_forward()
await browser.close()
async def test_tab_focus(self):
await self.browser.tab_focus(0)
async def test_close_tab(self):
"""Test closing a tab."""
mock_close = AsyncMock()
self.browser.page = AsyncMock()
self.browser.page.close = mock_close
await self.browser.close_tab()
mock_close.assert_called_once()

View file

@ -0,0 +1,92 @@
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import pytest_asyncio
from metagpt.tools.libs.cr import CodeReview
class MockFile:
def __init__(self, content):
self.content = content
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
async def read(self):
return self.content
@pytest.mark.asyncio
class TestCodeReview:
@pytest_asyncio.fixture(autouse=True)
async def setup(self):
"""Fixture to initialize the CodeReview instance."""
self.cr = CodeReview()
@patch("aiofiles.open", new_callable=MagicMock)
@patch("metagpt.utils.report.EditorReporter.async_report", new_callable=AsyncMock)
@patch("metagpt.ext.cr.actions.code_review.CodeReview.run", new_callable=AsyncMock)
async def test_review(self, mock_run, mock_report, mock_aiofiles_open):
"""Test the review method with a local patch file."""
# mock patch_content
patch_content = """diff --git a/test.py b/test.py
index 1234567..89abcde 100644
--- a/test.py
+++ b/test.py
@@ -1,3 +1,3 @@
def foo():
- print("Hello")
+ print("World")
- print("Another line")
+ print("Another modified line")"""
# mock point file content
point_file_content = json.dumps([{"id": 1, "description": "Test point"}])
mock_patch_file = MockFile(patch_content)
mock_point_file = MockFile(point_file_content)
mock_aiofiles_open.side_effect = [mock_patch_file, mock_point_file]
mock_run.return_value = [{"comment": "Fix this line"}]
# run
result = await self.cr.review(patch_path="test.patch", output_file="output.json")
# assert
assert "The number of defects: 1" in result
mock_run.assert_called_once()
mock_report.assert_called()
@patch("aiofiles.open", new_callable=MagicMock)
@patch("metagpt.ext.cr.actions.modify_code.ModifyCode.run", new_callable=AsyncMock)
async def test_fix(self, mock_run, mock_aiofiles_open):
"""Test the fix method."""
patch_content = """diff --git a/test.py b/test.py
index 1234567..89abcde 100644
--- a/test.py
+++ b/test.py
@@ -1,3 +1,3 @@
def foo():
- print("Hello")
+ print("World")
- print("Another line")
+ print("Another modified line")"""
cr_file_content = json.dumps([{"comment": "Fix this line"}])
# mock file obj
mock_path_file = MockFile(patch_content)
mock_cr_file = MockFile(cr_file_content)
mock_aiofiles_open.side_effect = [mock_path_file, mock_cr_file]
# run fix
result = await self.cr.fix(patch_path="test.patch", cr_file="cr.json", output_dir="output")
# assert
assert "The fixed patch files store in output" in result
mock_run.assert_called_once()

View file

@ -130,12 +130,8 @@ def test_insert_content(temp_py_file):
@pytest.mark.parametrize(
"filename",
[
TEST_DATA_PATH / "requirements/1.txt",
TEST_DATA_PATH / "requirements/1.json",
TEST_DATA_PATH / "requirements/1.constraint.md",
TEST_DATA_PATH / "requirements/pic/1.png",
TEST_DATA_PATH / "docx_for_test.docx",
TEST_DATA_PATH / "requirements/2.pdf",
TEST_DATA_PATH / "output_parser/1.md",
TEST_DATA_PATH / "search/serper-metagpt-8.json",
TEST_DATA_PATH / "audio/hello.mp3",
TEST_DATA_PATH / "code/python/1.py",
TEST_DATA_PATH / "code/js/1.js",
@ -264,12 +260,6 @@ def test_open_file_long_with_lineno(temp_file_path):
assert result.split("\n") == expected.split("\n")
def test_create_file_unexist_path():
editor = Editor()
with pytest.raises(FileNotFoundError):
editor.create_file("/unexist/path/a.txt")
@pytest.mark.asyncio
async def test_create_file(temp_file_path):
editor = Editor()
@ -578,15 +568,16 @@ Pay attention to the new content. Ensure that it aligns with the new parameters.
def test_edit_file_by_replace_mismatch(temp_py_file):
editor = Editor()
output = editor.edit_file_by_replace(
file_name=str(temp_py_file),
first_replaced_line_number=5,
first_replaced_line_content="",
new_content=" b = 9",
last_replaced_line_number=5,
last_replaced_line_content="",
)
assert output.strip() == MISMATCH_ERROR.strip()
with pytest.raises(ValueError) as match_error:
editor.edit_file_by_replace(
file_name=str(temp_py_file),
first_replaced_line_number=5,
first_replaced_line_content="",
new_content=" b = 9",
last_replaced_line_number=5,
last_replaced_line_content="",
)
assert str(match_error.value).strip() == MISMATCH_ERROR.strip()
def test_append_file(temp_file_path):

View file

@ -0,0 +1,64 @@
import os
from unittest.mock import AsyncMock
import pytest
from metagpt.tools.libs.env import (
EnvKeyNotFoundError,
default_get_env_description,
get_env,
get_env_default,
set_get_env_entry,
)
@pytest.mark.asyncio
class TestEnv:
@pytest.fixture(autouse=True)
def setup_and_teardown(self):
"""Setup and teardown for environment variables."""
self.mock_os_env = {
"TEST_APP-KEY": "value1",
"TEST_APP_KEY": "value2",
}
os.environ.update(self.mock_os_env)
yield
# Clear added environment variables
for key in self.mock_os_env.keys():
del os.environ[key]
async def test_get_env(self):
"""Test retrieving an environment variable."""
result = await get_env("KEY", app_name="TEST_APP")
assert result == "value1"
with pytest.raises(EnvKeyNotFoundError):
await get_env("NON_EXISTENT_KEY")
# Using no app_name
result = await get_env("TEST_APP_KEY")
assert result == "value2"
async def test_get_env_default(self):
"""Test retrieving environment variable with default value."""
result = await get_env_default("NON_EXISTENT_KEY", app_name="TEST_APP", default_value="default")
assert result == "default"
async def test_get_env_description(self):
"""Test retrieving descriptions for environment variables."""
descriptions = await default_get_env_description()
assert 'await get_env(key="KEY", app_name="TEST_APP")' in descriptions
assert (
descriptions['await get_env(key="KEY", app_name="TEST_APP")']
== "Return the value of environment variable `TEST_APP-KEY`."
)
async def test_set_get_env_entry(self):
"""Test overriding get_env functionality."""
mock_get_env_value = "mocked_value"
mock_func = AsyncMock(return_value=mock_get_env_value)
set_get_env_entry(mock_func, default_get_env_description)
result = await get_env("set_get_env")
assert result == mock_get_env_value

View file

@ -12,7 +12,6 @@ from pydantic import BaseModel
from metagpt.context import Context
from metagpt.roles.di.data_interpreter import DataInterpreter
from metagpt.schema import UserMessage
from metagpt.tools.libs.git import git_checkout, git_clone
from metagpt.utils.common import awrite
from metagpt.utils.git_repository import GitRepository
@ -26,21 +25,6 @@ async def get_env(key: str, app_name: str = ""):
return os.environ.get(key)
@pytest.mark.asyncio
@pytest.mark.parametrize(
["url", "commit_id"], [("https://github.com/sqlfluff/sqlfluff.git", "d19de0ecd16d298f9e3bfb91da122734c40c01e5")]
)
@pytest.mark.skip
async def test_git(url: str, commit_id: str):
repo_dir = await git_clone(url)
assert repo_dir
await git_checkout(repo_dir, commit_id)
repo = GitRepository(repo_dir, auto_init=False)
repo.delete_repository()
@pytest.mark.skip
@pytest.mark.asyncio
async def test_login():

View file

@ -0,0 +1,39 @@
from pathlib import Path
from unittest.mock import AsyncMock, patch
import pytest
import pytest_asyncio
from metagpt.tools.libs.image_getter import ImageGetter
@pytest.mark.asyncio
class TestImageGetter:
@pytest_asyncio.fixture(autouse=True)
async def image_getter_client(self):
"""Fixture to initialize the ImageGetter."""
self.image_getter = ImageGetter(headless=True)
await self.image_getter.start()
yield self.image_getter
if self.image_getter.browser_instance:
await self.image_getter.browser_instance.close()
@patch("metagpt.tools.libs.image_getter.decode_image")
async def test_get_image_success(self, mock_decode_image):
"""Test successfully retrieving and saving an image."""
search_term = "nature"
image_save_path = Path.cwd() / "test_image_getter.jpg"
# Mock the decode_image to avoid actual image decoding
mock_image = AsyncMock()
mock_decode_image.return_value = mock_image
# Mock the Playwright page evaluation result to return a dummy base64 image string
self.image_getter.page.goto = AsyncMock()
self.image_getter.page.wait_for_selector = AsyncMock()
self.image_getter.page.evaluate = AsyncMock(return_value="data:image/png;base64,FAKEBASE64STRING")
result = await self.image_getter.get_image(search_term, str(image_save_path))
assert f"{search_term} found." in result
mock_decode_image.assert_called_once()

View file

@ -46,7 +46,7 @@ async def test_index_repo(path, query):
],
)
def test_classify_path(paths, path_type, root):
result, result_root = IndexRepo.classify_path(paths)
result, result_root = IndexRepo.find_index_repo_path(paths)
assert path_type in set(result.keys())
assert root == result_root.get(path_type, "")

View file

@ -0,0 +1,64 @@
import tempfile
from pathlib import Path
import pytest
from metagpt.tools.libs.linter import Linter, LintResult
def test_linter_initialization():
linter = Linter(encoding="utf-8", root="/test/root")
assert linter.encoding == "utf-8"
assert linter.root == "/test/root"
assert "python" in linter.languages
assert callable(linter.languages["python"])
def test_get_abs_fname():
linter = Linter(root="/test/root")
abs_path = linter.get_abs_fname("test_file.py")
assert abs_path == linter.get_rel_fname("test_file.py")
def test_py_lint():
linter = Linter()
code = "print('Hello, World!')"
test_file_path = str(Path(__file__).resolve())
result = linter.py_lint(test_file_path, test_file_path, code)
assert result is None # No errors expected for valid Python code
def test_lint_with_python_file():
linter = Linter()
with tempfile.NamedTemporaryFile(suffix=".py", delete=True) as temp_file:
temp_file.write(b"def hello():\nprint('Hello')\n") # IndentationError
temp_file.flush()
result = linter.lint(temp_file.name)
assert isinstance(result, LintResult)
assert "IndentationError" in result.text
assert len(result.lines) > 0
def test_lint_with_unsupported_language():
linter = Linter()
with tempfile.NamedTemporaryFile(suffix=".unsupported", delete=True) as temp_file:
temp_file.write(b"This is unsupported code.")
temp_file.flush()
result = linter.lint(temp_file.name)
assert result is None # Unsupported language should return None
def test_run_cmd():
linter = Linter()
with tempfile.NamedTemporaryFile(suffix=".py", delete=True) as temp_file:
temp_file.write(b"print('Hello, World!')\n")
temp_file.flush()
result = linter.run_cmd("flake8", temp_file.name, "print('Hello, World!')")
# Since flake8 might not be installed in the test environment, we just ensure no exception is raised
assert result is None or isinstance(result, LintResult)
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -14,7 +14,8 @@ from metagpt.tools.libs.shell import shell_execute
],
)
async def test_shell(command, expect_stdout, expect_stderr):
stdout, stderr = await shell_execute(command)
stdout, stderr, returncode = await shell_execute(command)
assert returncode == 0
assert expect_stdout in stdout
assert stderr == expect_stderr

View file

@ -1,24 +1,14 @@
import pytest
from metagpt.tools.libs.web_scraping import scrape_web_playwright
from metagpt.tools.libs.web_scraping import view_page_element_to_scrape
@pytest.mark.asyncio
async def test_scrape_web_playwright(http_server):
server, test_url = await http_server()
result = await scrape_web_playwright(test_url)
# Assert that the result is a dictionary
assert isinstance(result, dict)
# Assert that the result contains 'inner_text' and 'html' keys
assert "inner_text" in result
assert "html" in result
# Assert startswith and endswith
assert not result["inner_text"].startswith(" ")
assert not result["inner_text"].endswith(" ")
assert not result["html"].startswith(" ")
assert not result["html"].endswith(" ")
await server.stop()
async def test_view_page_element_to_scrape():
# Define the test URL and parameters
test_url = "https://docs.deepwisdom.ai/main/zh/"
test_requirement = "Retrieve all paragraph texts"
test_keep_links = True
test_page = await view_page_element_to_scrape(test_url, test_requirement, test_keep_links)
assert isinstance(test_page, str)
assert "html" in test_page