This commit is contained in:
Leon 2023-08-10 12:00:36 -07:00
commit 46ada5a7f9
31 changed files with 1489 additions and 275 deletions

View file

@ -0,0 +1,32 @@
from pathlib import Path
from random import random
from tempfile import TemporaryDirectory
import pytest
from metagpt.roles import researcher
async def mock_llm_ask(self, prompt: str, system_msgs):
if "Please provide up to 2 necessary keywords" in prompt:
return '["dataiku", "datarobot"]'
elif "Provide up to 4 queries related to your research topic" in prompt:
return '["Dataiku machine learning platform", "DataRobot AI platform comparison", ' \
'"Dataiku vs DataRobot features", "Dataiku and DataRobot use cases"]'
elif "sort the remaining search results" in prompt:
return '[1,2]'
elif "Not relevant." in prompt:
return "Not relevant" if random() > 0.5 else prompt[-100:]
elif "provide a detailed research report" in prompt:
return f"# Research Report\n## Introduction\n{prompt}"
return ""
@pytest.mark.asyncio
async def test_researcher(mocker):
with TemporaryDirectory() as dirname:
topic = "dataiku vs. datarobot"
mocker.patch("metagpt.provider.base_gpt_api.BaseGPTAPI.aask", mock_llm_ask)
researcher.RESEARCH_PATH = Path(dirname)
await researcher.Researcher().run(topic)
assert (researcher.RESEARCH_PATH / f"{topic}.md").read_text().startswith("# Research Report")

View file

@ -5,24 +5,44 @@
@Author : alexanderwu
@File : test_search_engine.py
"""
from __future__ import annotations
import pytest
from metagpt.logs import logger
from metagpt.tools import SearchEngineType
from metagpt.tools.search_engine import SearchEngine
class MockSearchEnine:
async def run(self, query: str, max_results: int = 8, as_string: bool = True) -> str | list[dict[str, str]]:
rets = [{"url": "https://metagpt.com/mock/{i}", "title": query, "snippet": query * i} for i in range(max_results)]
return "\n".join(rets) if as_string else rets
@pytest.mark.asyncio
@pytest.mark.usefixtures("llm_api")
async def test_search_engine(llm_api):
search_engine = SearchEngine()
poetries = [
# ("北京美食", "北京"),
("屈臣氏", "屈臣氏")
]
for i, j in poetries:
rsp = await search_engine.run(i)
# rsp = context.llm.ask_batch([prompt])
logger.info(rsp)
# assert any(j in k['body'] for k in rsp)
assert len(rsp) > 0
@pytest.mark.parametrize(
("search_engine_typpe", "run_func", "max_results", "as_string"),
[
(SearchEngineType.SERPAPI_GOOGLE, None, 8, True),
(SearchEngineType.SERPAPI_GOOGLE, None, 4, False),
(SearchEngineType.DIRECT_GOOGLE, None, 8, True),
(SearchEngineType.DIRECT_GOOGLE, None, 6, False),
(SearchEngineType.SERPER_GOOGLE, None, 8, True),
(SearchEngineType.SERPER_GOOGLE, None, 6, False),
(SearchEngineType.DUCK_DUCK_GO, None, 8, True),
(SearchEngineType.DUCK_DUCK_GO, None, 6, False),
(SearchEngineType.CUSTOM_ENGINE, MockSearchEnine().run, 8, False),
(SearchEngineType.CUSTOM_ENGINE, MockSearchEnine().run, 6, False),
],
)
async def test_search_engine(search_engine_typpe, run_func, max_results, as_string, ):
search_engine = SearchEngine(search_engine_typpe, run_func)
rsp = await search_engine.run("metagpt", max_results=max_results, as_string=as_string)
logger.info(rsp)
if as_string:
assert isinstance(rsp, str)
else:
assert isinstance(rsp, list)
assert len(rsp) == max_results

View file

@ -1,4 +1,5 @@
import pytest
from metagpt.config import CONFIG
from metagpt.tools import web_browser_engine_playwright
@ -20,6 +21,7 @@ async def test_scrape_web_page(browser_type, use_proxy, kwagrs, url, urls, proxy
CONFIG.global_proxy = proxy
browser = web_browser_engine_playwright.PlaywrightWrapper(browser_type, **kwagrs)
result = await browser.run(url)
result = result.inner_text
assert isinstance(result, str)
assert "Deepwisdom" in result

View file

@ -1,4 +1,5 @@
import pytest
from metagpt.config import CONFIG
from metagpt.tools import web_browser_engine_selenium
@ -20,6 +21,7 @@ async def test_scrape_web_page(browser_type, use_proxy, url, urls, proxy, capfd)
CONFIG.global_proxy = proxy
browser = web_browser_engine_selenium.SeleniumWrapper(browser_type)
result = await browser.run(url)
result = result.inner_text
assert isinstance(result, str)
assert "Deepwisdom" in result
@ -27,7 +29,7 @@ async def test_scrape_web_page(browser_type, use_proxy, url, urls, proxy, capfd)
results = await browser.run(url, *urls)
assert isinstance(results, list)
assert len(results) == len(urls) + 1
assert all(("Deepwisdom" in i) for i in results)
assert all(("Deepwisdom" in i.inner_text) for i in results)
if use_proxy:
assert "Proxy:" in capfd.readouterr().out
finally:

View file

@ -0,0 +1,68 @@
from metagpt.utils import parse_html
PAGE = """
<!DOCTYPE html>
<html>
<head>
<title>Random HTML Example</title>
</head>
<body>
<h1>This is a Heading</h1>
<p>This is a paragraph with <a href="test">a link</a> and some <em>emphasized</em> text.</p>
<ul>
<li>Item 1</li>
<li>Item 2</li>
<li>Item 3</li>
</ul>
<ol>
<li>Numbered Item 1</li>
<li>Numbered Item 2</li>
<li>Numbered Item 3</li>
</ol>
<table>
<tr>
<th>Header 1</th>
<th>Header 2</th>
</tr>
<tr>
<td>Row 1, Cell 1</td>
<td>Row 1, Cell 2</td>
</tr>
<tr>
<td>Row 2, Cell 1</td>
<td>Row 2, Cell 2</td>
</tr>
</table>
<img src="image.jpg" alt="Sample Image">
<form action="/submit" method="post">
<label for="name">Name:</label>
<input type="text" id="name" name="name" required>
<label for="email">Email:</label>
<input type="email" id="email" name="email" required>
<button type="submit">Submit</button>
</form>
<div class="box">
<p>This is a div with a class "box".</p>
<p><a href="https://metagpt.com">a link</a></p>
<p><a href="#section2"></a></p>
<p><a href="ftp://192.168.1.1:8080"></a></p>
<p><a href="javascript:alert('Hello');"></a></p>
</div>
</body>
</html>
"""
CONTENT = 'This is a HeadingThis is a paragraph witha linkand someemphasizedtext.Item 1Item 2Item 3Numbered Item 1Numbered '\
'Item 2Numbered Item 3Header 1Header 2Row 1, Cell 1Row 1, Cell 2Row 2, Cell 1Row 2, Cell 2Name:Email:SubmitThis is a div '\
'with a class "box".a link'
def test_web_page():
page = parse_html.WebPage(inner_text=CONTENT, html=PAGE, url="http://example.com")
assert page.title == "Random HTML Example"
assert list(page.get_links()) == ["http://example.com/test", "https://metagpt.com"]
def test_get_page_content():
ret = parse_html.get_html_content(PAGE, "http://example.com")
assert ret == CONTENT

View file

@ -0,0 +1,77 @@
import pytest
from metagpt.utils.text import (
decode_unicode_escape,
generate_prompt_chunk,
reduce_message_length,
split_paragraph,
)
def _msgs():
length = 20
while length:
yield "Hello," * 1000 * length
length -= 1
def _paragraphs(n):
return " ".join("Hello World." for _ in range(n))
@pytest.mark.parametrize(
"msgs, model_name, system_text, reserved, expected",
[
(_msgs(), "gpt-3.5-turbo", "System", 1500, 1),
(_msgs(), "gpt-3.5-turbo-16k", "System", 3000, 6),
(_msgs(), "gpt-3.5-turbo-16k", "Hello," * 1000, 3000, 5),
(_msgs(), "gpt-4", "System", 2000, 3),
(_msgs(), "gpt-4", "Hello," * 1000, 2000, 2),
(_msgs(), "gpt-4-32k", "System", 4000, 14),
(_msgs(), "gpt-4-32k", "Hello," * 2000, 4000, 12),
]
)
def test_reduce_message_length(msgs, model_name, system_text, reserved, expected):
assert len(reduce_message_length(msgs, model_name, system_text, reserved)) / (len("Hello,")) / 1000 == expected
@pytest.mark.parametrize(
"text, prompt_template, model_name, system_text, reserved, expected",
[
(" ".join("Hello World." for _ in range(1000)), "Prompt: {}", "gpt-3.5-turbo", "System", 1500, 2),
(" ".join("Hello World." for _ in range(1000)), "Prompt: {}", "gpt-3.5-turbo-16k", "System", 3000, 1),
(" ".join("Hello World." for _ in range(4000)), "Prompt: {}", "gpt-4", "System", 2000, 2),
(" ".join("Hello World." for _ in range(8000)), "Prompt: {}", "gpt-4-32k", "System", 4000, 1),
]
)
def test_generate_prompt_chunk(text, prompt_template, model_name, system_text, reserved, expected):
ret = list(generate_prompt_chunk(text, prompt_template, model_name, system_text, reserved))
assert len(ret) == expected
@pytest.mark.parametrize(
"paragraph, sep, count, expected",
[
(_paragraphs(10), ".", 2, [_paragraphs(5), f" {_paragraphs(5)}"]),
(_paragraphs(10), ".", 3, [_paragraphs(4), f" {_paragraphs(3)}", f" {_paragraphs(3)}"]),
(f"{_paragraphs(5)}\n{_paragraphs(3)}", "\n.", 2, [f"{_paragraphs(5)}\n", _paragraphs(3)]),
("......", ".", 2, ["...", "..."]),
("......", ".", 3, ["..", "..", ".."]),
(".......", ".", 2, ["....", "..."]),
]
)
def test_split_paragraph(paragraph, sep, count, expected):
ret = split_paragraph(paragraph, sep, count)
assert ret == expected
@pytest.mark.parametrize(
"text, expected",
[
("Hello\\nWorld", "Hello\nWorld"),
("Hello\\tWorld", "Hello\tWorld"),
("Hello\\u0020World", "Hello World"),
]
)
def test_decode_unicode_escape(text, expected):
assert decode_unicode_escape(text) == expected