feat: merge geekan:main

This commit is contained in:
莘权 马 2023-12-22 16:40:04 +08:00
commit 9a1909bb95
139 changed files with 4649 additions and 1504 deletions

View file

@ -7,7 +7,7 @@
"""
from typing import List, Tuple
from metagpt.actions import ActionOutput
from metagpt.actions.action_node import ActionNode
t_dict = {
"Required Python third-party packages": '"""\nflask==1.1.2\npygame==2.0.1\n"""\n',
@ -37,12 +37,12 @@ WRITE_TASKS_OUTPUT_MAPPING = {
def test_create_model_class():
test_class = ActionOutput.create_model_class("test_class", WRITE_TASKS_OUTPUT_MAPPING)
test_class = ActionNode.create_model_class("test_class", WRITE_TASKS_OUTPUT_MAPPING)
assert test_class.__name__ == "test_class"
def test_create_model_class_with_mapping():
t = ActionOutput.create_model_class("test_class_1", WRITE_TASKS_OUTPUT_MAPPING)
t = ActionNode.create_model_class("test_class_1", WRITE_TASKS_OUTPUT_MAPPING)
t1 = t(**t_dict)
value = t1.dict()["Task list"]
assert value == ["game.py", "app.py", "static/css/styles.css", "static/js/script.js", "templates/index.html"]

View file

@ -0,0 +1,16 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/7/1 22:50
@Author : alexanderwu
@File : test_azure_tts.py
"""
from metagpt.tools.azure_tts import AzureTTS
def test_azure_tts():
azure_tts = AzureTTS()
azure_tts.synthesize_speech("zh-CN", "zh-CN-YunxiNeural", "Boy", "你好,我是卡卡", "output.wav")
# 运行需要先配置 SUBSCRIPTION_KEY
# TODO: 这里如果要检验还要额外加上对应的asr才能确保前后生成是接近一致的但现在还没有

View file

@ -3,21 +3,27 @@
"""
@Time : 2023/9/13 00:26
@Author : fisherdeng
@File : test_detail_mining.py
@File : test_generate_questions.py
"""
import pytest
from metagpt.actions.detail_mining import DetailMining
from metagpt.actions.generate_questions import GenerateQuestions
from metagpt.logs import logger
context = """
## topic
如何做一个生日蛋糕
## record
我认为应该先准备好材料然后再开始做蛋糕
"""
@pytest.mark.asyncio
async def test_detail_mining():
topic = "如何做一个生日蛋糕"
record = "我认为应该先准备好材料,然后再开始做蛋糕。"
detail_mining = DetailMining("detail_mining")
rsp = await detail_mining.run(topic=topic, record=record)
async def test_generate_questions():
detail_mining = GenerateQuestions()
rsp = await detail_mining.run(context)
logger.info(f"{rsp.content=}")
assert "##OUTPUT" in rsp.content
assert "蛋糕" in rsp.content
assert "Questions" in rsp.content
assert "1." in rsp.content

View file

@ -0,0 +1,21 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/9/13 00:26
@Author : fisherdeng
@File : test_detail_mining.py
"""
import pytest
from metagpt.actions.prepare_interview import PrepareInterview
from metagpt.logs import logger
@pytest.mark.asyncio
async def test_prepare_interview():
action = PrepareInterview()
rsp = await action.run("I just graduated and hope to find a job as a Python engineer")
logger.info(f"{rsp.content=}")
assert "Questions" in rsp.content
assert "1." in rsp.content

View file

@ -0,0 +1,53 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/12/20 15:01
@Author : alexanderwu
@File : test_write_review.py
"""
import pytest
from metagpt.actions.write_review import WriteReview
CONTEXT = """
{
"Language": "zh_cn",
"Programming Language": "Python",
"Original Requirements": "写一个简单的2048",
"Project Name": "game_2048",
"Product Goals": [
"创建一个引人入胜的用户体验",
"确保高性能",
"提供可定制的功能"
],
"User Stories": [
"作为用户,我希望能够选择不同的难度级别",
"作为玩家,我希望在每局游戏结束后能看到我的得分"
],
"Competitive Analysis": [
"Python Snake Game: 界面简单,缺乏高级功能"
],
"Competitive Quadrant Chart": "quadrantChart\n title \"Reach and engagement of campaigns\"\n x-axis \"Low Reach\" --> \"High Reach\"\n y-axis \"Low Engagement\" --> \"High Engagement\"\n quadrant-1 \"我们应该扩展\"\n quadrant-2 \"需要推广\"\n quadrant-3 \"重新评估\"\n quadrant-4 \"可能需要改进\"\n \"Campaign A\": [0.3, 0.6]\n \"Campaign B\": [0.45, 0.23]\n \"Campaign C\": [0.57, 0.69]\n \"Campaign D\": [0.78, 0.34]\n \"Campaign E\": [0.40, 0.34]\n \"Campaign F\": [0.35, 0.78]\n \"Our Target Product\": [0.5, 0.6]",
"Requirement Analysis": "产品应该用户友好。",
"Requirement Pool": [
[
"P0",
"主要代码..."
],
[
"P0",
"游戏算法..."
]
],
"UI Design draft": "基本功能描述,简单的风格和布局。",
"Anything UNCLEAR": "..."
}
"""
@pytest.mark.asyncio
async def test_write_review():
write_review = WriteReview()
review = await write_review.run(CONTEXT)
assert review.instruct_content
assert review.get("LGTM") in ["LGTM", "LBTM"]

View file

@ -8,7 +8,7 @@
from typing import List
from metagpt.actions import UserRequirement, WritePRD
from metagpt.actions.action_output import ActionOutput
from metagpt.actions.action_node import ActionNode
from metagpt.memory.memory_storage import MemoryStorage
from metagpt.schema import Message
@ -42,7 +42,7 @@ def test_idea_message():
def test_actionout_message():
out_mapping = {"field1": (str, ...), "field2": (List[str], ...)}
out_data = {"field1": "field1 value", "field2": ["field2 value1", "field2 value2"]}
ic_obj = ActionOutput.create_model_class("prd", out_mapping)
ic_obj = ActionNode.create_model_class("prd", out_mapping)
role_id = "UTUser2(Architect)"
content = "The user has requested the creation of a command-line interface (CLI) snake game"

View file

@ -0,0 +1,41 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : the unittest of google gemini api
from abc import ABC
from dataclasses import dataclass
import pytest
from metagpt.provider.google_gemini_api import GeminiGPTAPI
messages = [{"role": "user", "content": "who are you"}]
@dataclass
class MockGeminiResponse(ABC):
text: str
default_resp = MockGeminiResponse(text="I'm gemini from google")
def mock_llm_ask(self, messages: list[dict]) -> MockGeminiResponse:
return default_resp
def test_gemini_completion(mocker):
mocker.patch("metagpt.provider.google_gemini_api.GeminiGPTAPI.completion", mock_llm_ask)
resp = GeminiGPTAPI().completion(messages)
assert resp.text == default_resp.text
async def mock_llm_aask(self, messgaes: list[dict]) -> MockGeminiResponse:
return default_resp
@pytest.mark.asyncio
async def test_gemini_acompletion(mocker):
mocker.patch("metagpt.provider.google_gemini_api.GeminiGPTAPI.acompletion", mock_llm_aask)
resp = await GeminiGPTAPI().acompletion(messages)
assert resp.text == default_resp.text

View file

@ -1,3 +1,5 @@
from unittest.mock import Mock
import pytest
from metagpt.provider.openai_api import OpenAIGPTAPI
@ -78,3 +80,70 @@ def test_ask_code_list_str():
assert "language" in rsp
assert "code" in rsp
assert len(rsp["code"]) > 0
class TestOpenAI:
@pytest.fixture
def config(self):
return Mock(openai_api_key="test_key", openai_base_url="test_url", openai_proxy=None, openai_api_type="other")
@pytest.fixture
def config_azure(self):
return Mock(
openai_api_key="test_key",
openai_api_version="test_version",
openai_base_url="test_url",
openai_proxy=None,
openai_api_type="azure",
)
@pytest.fixture
def config_proxy(self):
return Mock(
openai_api_key="test_key",
openai_base_url="test_url",
openai_proxy="http://proxy.com",
openai_api_type="other",
)
@pytest.fixture
def config_azure_proxy(self):
return Mock(
openai_api_key="test_key",
openai_api_version="test_version",
openai_base_url="test_url",
openai_proxy="http://proxy.com",
openai_api_type="azure",
)
def test_make_client_kwargs_without_proxy(self, config):
instance = OpenAIGPTAPI()
instance.config = config
kwargs, async_kwargs = instance._make_client_kwargs()
assert kwargs == {"api_key": "test_key", "base_url": "test_url"}
assert async_kwargs == {"api_key": "test_key", "base_url": "test_url"}
assert "http_client" not in kwargs
assert "http_client" not in async_kwargs
def test_make_client_kwargs_without_proxy_azure(self, config_azure):
instance = OpenAIGPTAPI()
instance.config = config_azure
kwargs, async_kwargs = instance._make_client_kwargs()
assert kwargs == {"api_key": "test_key", "api_version": "test_version", "azure_endpoint": "test_url"}
assert async_kwargs == {"api_key": "test_key", "api_version": "test_version", "azure_endpoint": "test_url"}
assert "http_client" not in kwargs
assert "http_client" not in async_kwargs
def test_make_client_kwargs_with_proxy(self, config_proxy):
instance = OpenAIGPTAPI()
instance.config = config_proxy
kwargs, async_kwargs = instance._make_client_kwargs()
assert "http_client" in kwargs
assert "http_client" in async_kwargs
def test_make_client_kwargs_with_proxy_azure(self, config_azure_proxy):
instance = OpenAIGPTAPI()
instance.config = config_azure_proxy
kwargs, async_kwargs = instance._make_client_kwargs()
assert "http_client" in kwargs
assert "http_client" in async_kwargs

View file

@ -0,0 +1,11 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : unittest of Role
from metagpt.roles.role import Role
def test_role_desc():
role = Role(profile="Sales", desc="Best Seller")
assert role.profile == "Sales"
assert role._setting.desc == "Best Seller"

View file

@ -8,6 +8,7 @@ from functools import wraps
from importlib import import_module
from metagpt.actions import Action, ActionOutput, WritePRD
from metagpt.actions.action_node import ActionNode
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.roles import Role
@ -15,44 +16,38 @@ from metagpt.schema import Message
from metagpt.tools.sd_engine import SDEngine
PROMPT_TEMPLATE = """
# Context
{context}
## Format example
{format_example}
-----
Role: You are a UserInterface Designer; the goal is to finish a UI design according to PRD, give a design description, and select specified elements and UI style.
Requirements: Based on the context, fill in the following missing information, provide detailed HTML and CSS code
Attention: Use '##' to split sections, not '#', and '## <SECTION_NAME>' SHOULD WRITE BEFORE the code and triple quote.
## UI Design Description:Provide as Plain text, place the design objective here
## Selected Elements:Provide as Plain text, up to 5 specified elements, clear and simple
## HTML Layout:Provide as Plain text, use standard HTML code
## CSS Styles (styles.css):Provide as Plain text,use standard css code
## Anything UNCLEAR:Provide as Plain text. Try to clarify it.
## Role
You are a UserInterface Designer; the goal is to finish a UI design according to PRD, give a design description, and select specified elements and UI style.
"""
FORMAT_EXAMPLE = """
UI_DESIGN_DESC = ActionNode(
key="UI Design Desc",
expected_type=str,
instruction="place the design objective here",
example="Snake games are classic and addictive games with simple yet engaging elements. Here are the main elements"
" commonly found in snake games",
)
## UI Design Description
```Snake games are classic and addictive games with simple yet engaging elements. Here are the main elements commonly found in snake games ```
SELECTED_ELEMENTS = ActionNode(
key="Selected Elements",
expected_type=list[str],
instruction="up to 5 specified elements, clear and simple",
example=[
"Game Grid: The game grid is a rectangular...",
"Snake: The player controls a snake that moves across the grid...",
"Food: Food items (often represented as small objects or differently colored blocks)",
"Score: The player's score increases each time the snake eats a piece of food. The longer the snake becomes, the higher the score.",
"Game Over: The game ends when the snake collides with itself or an obstacle. At this point, the player's final score is displayed, and they are given the option to restart the game.",
],
)
## Selected Elements
Game Grid: The game grid is a rectangular...
Snake: The player controls a snake that moves across the grid...
Food: Food items (often represented as small objects or differently colored blocks)
Score: The player's score increases each time the snake eats a piece of food. The longer the snake becomes, the higher the score.
Game Over: The game ends when the snake collides with itself or an obstacle. At this point, the player's final score is displayed, and they are given the option to restart the game.
## HTML Layout
<!DOCTYPE html>
HTML_LAYOUT = ActionNode(
key="HTML Layout",
expected_type=str,
instruction="use standard HTML code",
example="""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
@ -69,9 +64,14 @@ Game Over: The game ends when the snake collides with itself or an obstacle. At
</div>
</body>
</html>
""",
)
## CSS Styles (styles.css)
body {
CSS_STYLES = ActionNode(
key="CSS Styles",
expected_type=str,
instruction="use standard css code",
example="""body {
display: flex;
justify-content: center;
align-items: center;
@ -119,19 +119,25 @@ body {
color: #ff0000;
display: none;
}
""",
)
## Anything UNCLEAR
There are no unclear points.
ANYTHING_UNCLEAR = ActionNode(
key="Anything UNCLEAR",
expected_type=str,
instruction="Mention any aspects of the project that are unclear and try to clarify them.",
example="...",
)
"""
NODES = [
UI_DESIGN_DESC,
SELECTED_ELEMENTS,
HTML_LAYOUT,
CSS_STYLES,
ANYTHING_UNCLEAR,
]
OUTPUT_MAPPING = {
"UI Design Description": (str, ...),
"Selected Elements": (str, ...),
"HTML Layout": (str, ...),
"CSS Styles (styles.css)": (str, ...),
"Anything UNCLEAR": (str, ...),
}
UI_DESIGN_NODE = ActionNode.from_children("UI_DESIGN", NODES)
def load_engine(func):
@ -221,10 +227,8 @@ class UIDesign(Action):
css_file_path = save_dir / "ui_design.css"
html_file_path = save_dir / "ui_design.html"
with open(css_file_path, "w") as css_file:
css_file.write(css_content)
with open(html_file_path, "w") as html_file:
html_file.write(html_content)
css_file_path.write_text(css_content)
html_file_path.write_text(html_content)
async def run(self, requirements: list[Message], *args, **kwargs) -> ActionOutput:
"""Run the UI Design action."""
@ -232,9 +236,9 @@ class UIDesign(Action):
context = requirements[-1].content
ui_design_draft = self.parse_requirement(context=context)
# todo: parse requirements str
prompt = PROMPT_TEMPLATE.format(context=ui_design_draft, format_example=FORMAT_EXAMPLE)
prompt = PROMPT_TEMPLATE.format(context=ui_design_draft)
logger.info(prompt)
ui_describe = await self._aask_v1(prompt, "ui_design", OUTPUT_MAPPING)
ui_describe = await UI_DESIGN_NODE.fill(prompt)
logger.info(ui_describe.content)
logger.info(ui_describe.instruct_content)
css = self.parse_css_code(context=ui_describe.content)

View file

@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
# @Date : 11/22/2023 11:48 AM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :

View file

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
# @Date : 11/22/2023 11:48 AM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import pytest
from metagpt.actions import Action
from metagpt.llm import LLM
def test_action_serialize():
action = Action()
ser_action_dict = action.dict()
assert "name" in ser_action_dict
# assert "llm" not in ser_action_dict # not export
@pytest.mark.asyncio
async def test_action_deserialize():
action = Action()
serialized_data = action.dict()
new_action = Action(**serialized_data)
assert new_action.name == ""
assert new_action.llm == LLM()
assert len(await new_action._aask("who are you")) > 0

View file

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# @Date : 11/26/2023 2:04 PM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import pytest
from metagpt.actions.action import Action
from metagpt.roles.architect import Architect
def test_architect_serialize():
role = Architect()
ser_role_dict = role.dict(by_alias=True)
assert "name" in ser_role_dict
assert "_states" in ser_role_dict
assert "_actions" in ser_role_dict
@pytest.mark.asyncio
async def test_architect_deserialize():
role = Architect()
ser_role_dict = role.dict(by_alias=True)
new_role = Architect(**ser_role_dict)
# new_role = Architect.deserialize(ser_role_dict)
assert new_role.name == "Bob"
assert len(new_role._actions) == 1
assert isinstance(new_role._actions[0], Action)
await new_role._actions[0].run(with_messages="write a cli snake game")

View file

@ -0,0 +1,88 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :
import shutil
from metagpt.actions.action_node import ActionNode
from metagpt.actions.add_requirement import UserRequirement
from metagpt.actions.project_management import WriteTasks
from metagpt.environment import Environment
from metagpt.roles.project_manager import ProjectManager
from metagpt.schema import Message
from metagpt.utils.common import any_to_str
from tests.metagpt.serialize_deserialize.test_serdeser_base import (
ActionOK,
RoleC,
serdeser_path,
)
def test_env_serialize():
env = Environment()
ser_env_dict = env.dict()
assert "roles" in ser_env_dict
def test_env_deserialize():
env = Environment()
env.publish_message(message=Message(content="test env serialize"))
ser_env_dict = env.dict()
new_env = Environment(**ser_env_dict)
assert len(new_env.roles) == 0
assert len(new_env.history) == 25
def test_environment_serdeser():
out_mapping = {"field1": (list[str], ...)}
out_data = {"field1": ["field1 value1", "field1 value2"]}
ic_obj = ActionNode.create_model_class("prd", out_mapping)
message = Message(
content="prd", instruct_content=ic_obj(**out_data), role="product manager", cause_by=any_to_str(UserRequirement)
)
environment = Environment()
role_c = RoleC()
environment.add_role(role_c)
environment.publish_message(message)
ser_data = environment.dict()
assert ser_data["roles"]["Role C"]["name"] == "RoleC"
new_env: Environment = Environment(**ser_data)
assert len(new_env.roles) == 1
assert list(new_env.roles.values())[0]._states == list(environment.roles.values())[0]._states
assert list(new_env.roles.values())[0]._actions == list(environment.roles.values())[0]._actions
assert isinstance(list(environment.roles.values())[0]._actions[0], ActionOK)
assert type(list(new_env.roles.values())[0]._actions[0]) == ActionOK
def test_environment_serdeser_v2():
environment = Environment()
pm = ProjectManager()
environment.add_role(pm)
ser_data = environment.dict()
new_env: Environment = Environment(**ser_data)
role = new_env.get_role(pm.profile)
assert isinstance(role, ProjectManager)
assert isinstance(role._actions[0], WriteTasks)
assert isinstance(list(new_env.roles.values())[0]._actions[0], WriteTasks)
def test_environment_serdeser_save():
environment = Environment()
role_c = RoleC()
shutil.rmtree(serdeser_path.joinpath("team"), ignore_errors=True)
stg_path = serdeser_path.joinpath("team", "environment")
environment.add_role(role_c)
environment.serialize(stg_path)
new_env: Environment = Environment.deserialize(stg_path)
assert len(new_env.roles) == 1
assert type(list(new_env.roles.values())[0]._actions[0]) == ActionOK

View file

@ -0,0 +1,63 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : unittest of memory
from pydantic import BaseModel
from metagpt.actions.action_node import ActionNode
from metagpt.actions.add_requirement import UserRequirement
from metagpt.actions.design_api import WriteDesign
from metagpt.memory.memory import Memory
from metagpt.schema import Message
from metagpt.utils.common import any_to_str
from tests.metagpt.serialize_deserialize.test_serdeser_base import serdeser_path
def test_memory_serdeser():
msg1 = Message(role="Boss", content="write a snake game", cause_by=UserRequirement)
out_mapping = {"field2": (list[str], ...)}
out_data = {"field2": ["field2 value1", "field2 value2"]}
ic_obj = ActionNode.create_model_class("system_design", out_mapping)
msg2 = Message(
role="Architect", instruct_content=ic_obj(**out_data), content="system design content", cause_by=WriteDesign
)
memory = Memory()
memory.add_batch([msg1, msg2])
ser_data = memory.dict()
new_memory = Memory(**ser_data)
assert new_memory.count() == 2
new_msg2 = new_memory.get(2)[0]
assert isinstance(new_msg2, BaseModel)
assert isinstance(new_memory.storage[-1], BaseModel)
assert new_memory.storage[-1].cause_by == any_to_str(WriteDesign)
assert new_msg2.role == "Boss"
def test_memory_serdeser_save():
msg1 = Message(role="User", content="write a 2048 game", cause_by=UserRequirement)
out_mapping = {"field1": (list[str], ...)}
out_data = {"field1": ["field1 value1", "field1 value2"]}
ic_obj = ActionNode.create_model_class("system_design", out_mapping)
msg2 = Message(
role="Architect", instruct_content=ic_obj(**out_data), content="system design content", cause_by=WriteDesign
)
memory = Memory()
memory.add_batch([msg1, msg2])
stg_path = serdeser_path.joinpath("team", "environment")
memory.serialize(stg_path)
assert stg_path.joinpath("memory.json").exists()
new_memory = Memory.deserialize(stg_path)
assert new_memory.count() == 2
new_msg2 = new_memory.get(1)[0]
assert new_msg2.instruct_content.field1 == ["field1 value1", "field1 value2"]
assert new_msg2.cause_by == any_to_str(WriteDesign)
assert len(new_memory.index) == 2
stg_path.joinpath("memory.json").unlink()

View file

@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
# @Date : 11/26/2023 2:07 PM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import pytest
from metagpt.actions.action import Action
from metagpt.roles.product_manager import ProductManager
from metagpt.schema import Message
@pytest.mark.asyncio
async def test_product_manager_deserialize():
role = ProductManager()
ser_role_dict = role.dict(by_alias=True)
new_role = ProductManager(**ser_role_dict)
assert new_role.name == "Alice"
assert len(new_role._actions) == 2
assert isinstance(new_role._actions[0], Action)
await new_role._actions[0].run([Message(content="write a cli snake game")])

View file

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# @Date : 11/26/2023 2:06 PM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import pytest
from metagpt.actions.action import Action
from metagpt.actions.project_management import WriteTasks
from metagpt.roles.project_manager import ProjectManager
def test_project_manager_serialize():
role = ProjectManager()
ser_role_dict = role.dict(by_alias=True)
assert "name" in ser_role_dict
assert "_states" in ser_role_dict
assert "_actions" in ser_role_dict
@pytest.mark.asyncio
async def test_project_manager_deserialize():
role = ProjectManager()
ser_role_dict = role.dict(by_alias=True)
new_role = ProjectManager(**ser_role_dict)
assert new_role.name == "Eve"
assert len(new_role._actions) == 1
assert isinstance(new_role._actions[0], Action)
assert isinstance(new_role._actions[0], WriteTasks)
# await new_role._actions[0].run(context="write a cli snake game")

View file

@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
# @Date : 11/23/2023 4:49 PM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import shutil
import pytest
from metagpt.actions import WriteCode
from metagpt.actions.add_requirement import UserRequirement
from metagpt.const import SERDESER_PATH
from metagpt.logs import logger
from metagpt.roles.engineer import Engineer
from metagpt.roles.product_manager import ProductManager
from metagpt.roles.role import Role
from metagpt.schema import Message
from metagpt.utils.common import format_trackback_info
from tests.metagpt.serialize_deserialize.test_serdeser_base import (
RoleA,
RoleB,
RoleC,
serdeser_path,
)
def test_roles():
role_a = RoleA()
assert len(role_a._rc.watch) == 1
role_b = RoleB()
assert len(role_a._rc.watch) == 1
assert len(role_b._rc.watch) == 1
def test_role_serialize():
role = Role()
ser_role_dict = role.dict(by_alias=True)
assert "name" in ser_role_dict
assert "_states" in ser_role_dict
assert "_actions" in ser_role_dict
def test_engineer_serialize():
role = Engineer()
ser_role_dict = role.dict(by_alias=True)
assert "name" in ser_role_dict
assert "_states" in ser_role_dict
assert "_actions" in ser_role_dict
@pytest.mark.asyncio
async def test_engineer_deserialize():
role = Engineer(use_code_review=True)
ser_role_dict = role.dict(by_alias=True)
new_role = Engineer(**ser_role_dict)
assert new_role.name == "Alex"
assert new_role.use_code_review is True
assert len(new_role._actions) == 1
assert isinstance(new_role._actions[0], WriteCode)
# await new_role._actions[0].run(context="write a cli snake game", filename="test_code")
def test_role_serdeser_save():
stg_path_prefix = serdeser_path.joinpath("team", "environment", "roles")
shutil.rmtree(serdeser_path.joinpath("team"), ignore_errors=True)
pm = ProductManager()
role_tag = f"{pm.__class__.__name__}_{pm.name}"
stg_path = stg_path_prefix.joinpath(role_tag)
pm.serialize(stg_path)
new_pm = Role.deserialize(stg_path)
assert new_pm.name == pm.name
assert len(new_pm.get_memories(1)) == 0
@pytest.mark.asyncio
async def test_role_serdeser_interrupt():
role_c = RoleC()
shutil.rmtree(SERDESER_PATH.joinpath("team"), ignore_errors=True)
stg_path = SERDESER_PATH.joinpath("team", "environment", "roles", f"{role_c.__class__.__name__}_{role_c.name}")
try:
await role_c.run(with_message=Message(content="demo", cause_by=UserRequirement))
except Exception:
logger.error(f"Exception in `role_a.run`, detail: {format_trackback_info()}")
role_c.serialize(stg_path)
assert role_c._rc.memory.count() == 1
new_role_a: Role = Role.deserialize(stg_path)
assert new_role_a._rc.state == 1
with pytest.raises(Exception):
await role_c.run(with_message=Message(content="demo", cause_by=UserRequirement))

View file

@ -0,0 +1,38 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : unittest of schema ser&deser
from metagpt.actions.action_node import ActionNode
from metagpt.actions.write_code import WriteCode
from metagpt.schema import Message
from metagpt.utils.common import any_to_str
from tests.metagpt.serialize_deserialize.test_serdeser_base import MockMessage
def test_message_serdeser():
out_mapping = {"field3": (str, ...), "field4": (list[str], ...)}
out_data = {"field3": "field3 value3", "field4": ["field4 value1", "field4 value2"]}
ic_obj = ActionNode.create_model_class("code", out_mapping)
message = Message(content="code", instruct_content=ic_obj(**out_data), role="engineer", cause_by=WriteCode)
ser_data = message.dict()
assert ser_data["cause_by"] == "metagpt.actions.write_code.WriteCode"
assert ser_data["instruct_content"]["class"] == "code"
new_message = Message(**ser_data)
assert new_message.cause_by == any_to_str(WriteCode)
assert new_message.cause_by in [any_to_str(WriteCode)]
assert new_message.instruct_content == ic_obj(**out_data)
def test_message_without_postprocess():
"""to explain `instruct_content` should be postprocessed"""
out_mapping = {"field1": (list[str], ...)}
out_data = {"field1": ["field1 value1", "field1 value2"]}
ic_obj = ActionNode.create_model_class("code", out_mapping)
message = MockMessage(content="code", instruct_content=ic_obj(**out_data))
ser_data = message.dict()
assert ser_data["instruct_content"] == {"field1": ["field1 value1", "field1 value2"]}
new_message = MockMessage(**ser_data)
assert new_message.instruct_content != ic_obj(**out_data)

View file

@ -0,0 +1,87 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : base test actions / roles used in unittest
import asyncio
from pathlib import Path
from pydantic import BaseModel, Field
from metagpt.actions import Action, ActionOutput
from metagpt.actions.action_node import ActionNode
from metagpt.actions.add_requirement import UserRequirement
from metagpt.roles.role import Role, RoleReactMode
serdeser_path = Path(__file__).absolute().parent.joinpath("..", "..", "data", "serdeser_storage")
class MockMessage(BaseModel):
"""to test normal dict without postprocess"""
content: str = ""
instruct_content: BaseModel = Field(default=None)
class ActionPass(Action):
name: str = Field(default="ActionPass")
async def run(self, messages: list["Message"]) -> ActionOutput:
await asyncio.sleep(5) # sleep to make other roles can watch the executed Message
output_mapping = {"result": (str, ...)}
pass_class = ActionNode.create_model_class("pass", output_mapping)
pass_output = ActionOutput("ActionPass run passed", pass_class(**{"result": "pass result"}))
return pass_output
class ActionOK(Action):
name: str = Field(default="ActionOK")
async def run(self, messages: list["Message"]) -> str:
await asyncio.sleep(5)
return "ok"
class ActionRaise(Action):
name: str = Field(default="ActionRaise")
async def run(self, messages: list["Message"]) -> str:
raise RuntimeError("parse error in ActionRaise")
class RoleA(Role):
name: str = Field(default="RoleA")
profile: str = Field(default="Role A")
goal: str = "RoleA's goal"
constraints: str = "RoleA's constraints"
def __init__(self, **kwargs):
super(RoleA, self).__init__(**kwargs)
self._init_actions([ActionPass])
self._watch([UserRequirement])
class RoleB(Role):
name: str = Field(default="RoleB")
profile: str = Field(default="Role B")
goal: str = "RoleB's goal"
constraints: str = "RoleB's constraints"
def __init__(self, **kwargs):
super(RoleB, self).__init__(**kwargs)
self._init_actions([ActionOK, ActionRaise])
self._watch([ActionPass])
self._rc.react_mode = RoleReactMode.BY_ORDER
class RoleC(Role):
name: str = Field(default="RoleC")
profile: str = Field(default="Role C")
goal: str = "RoleC's goal"
constraints: str = "RoleC's constraints"
def __init__(self, **kwargs):
super(RoleC, self).__init__(**kwargs)
self._init_actions([ActionOK, ActionRaise])
self._watch([UserRequirement])
self._rc.react_mode = RoleReactMode.BY_ORDER

View file

@ -0,0 +1,135 @@
# -*- coding: utf-8 -*-
# @Date : 11/27/2023 10:07 AM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import shutil
import pytest
from metagpt.const import SERDESER_PATH
from metagpt.logs import logger
from metagpt.roles import Architect, ProductManager, ProjectManager
from metagpt.team import Team
from tests.metagpt.serialize_deserialize.test_serdeser_base import (
ActionOK,
RoleA,
RoleB,
RoleC,
serdeser_path,
)
def test_team_deserialize():
company = Team()
pm = ProductManager()
arch = Architect()
company.hire(
[
pm,
arch,
ProjectManager(),
]
)
assert len(company.env.get_roles()) == 3
ser_company = company.dict()
new_company = Team(**ser_company)
assert len(new_company.env.get_roles()) == 3
assert new_company.env.get_role(pm.profile) is not None
new_pm = new_company.env.get_role(pm.profile)
assert type(new_pm) == ProductManager
assert new_company.env.get_role(pm.profile) is not None
assert new_company.env.get_role(arch.profile) is not None
def test_team_serdeser_save():
company = Team()
company.hire([RoleC()])
stg_path = serdeser_path.joinpath("team")
shutil.rmtree(stg_path, ignore_errors=True)
company.serialize(stg_path=stg_path)
new_company = Team.deserialize(stg_path)
assert len(new_company.env.roles) == 1
@pytest.mark.asyncio
async def test_team_recover():
idea = "write a snake game"
stg_path = SERDESER_PATH.joinpath("team")
shutil.rmtree(stg_path, ignore_errors=True)
company = Team()
role_c = RoleC()
company.hire([role_c])
company.run_project(idea)
await company.run(n_round=4)
ser_data = company.dict()
new_company = Team(**ser_data)
new_role_c = new_company.env.get_role(role_c.profile)
# assert new_role_c._rc.memory == role_c._rc.memory # TODO
assert new_role_c._rc.env != role_c._rc.env # TODO
assert type(list(new_company.env.roles.values())[0]._actions[0]) == ActionOK
new_company.run_project(idea)
await new_company.run(n_round=4)
@pytest.mark.asyncio
async def test_team_recover_save():
idea = "write a 2048 web game"
stg_path = SERDESER_PATH.joinpath("team")
shutil.rmtree(stg_path, ignore_errors=True)
company = Team()
role_c = RoleC()
company.hire([role_c])
company.run_project(idea)
await company.run(n_round=4)
new_company = Team.deserialize(stg_path)
new_role_c = new_company.env.get_role(role_c.profile)
# assert new_role_c._rc.memory == role_c._rc.memory
assert new_role_c._rc.env != role_c._rc.env
assert new_role_c.recovered != role_c.recovered # here cause previous ut is `!=`
assert new_role_c._rc.todo != role_c._rc.todo # serialize exclude `_rc.todo`
assert new_role_c._rc.news != role_c._rc.news # serialize exclude `_rc.news`
new_company.run_project(idea)
await new_company.run(n_round=4)
@pytest.mark.asyncio
async def test_team_recover_multi_roles_save():
idea = "write a snake game"
stg_path = SERDESER_PATH.joinpath("team")
shutil.rmtree(stg_path, ignore_errors=True)
role_a = RoleA()
role_b = RoleB()
assert role_a.subscription == {"tests.metagpt.serialize_deserialize.test_serdeser_base.RoleA", "RoleA"}
assert role_b.subscription == {"tests.metagpt.serialize_deserialize.test_serdeser_base.RoleB", "RoleB"}
assert role_b._rc.watch == {"tests.metagpt.serialize_deserialize.test_serdeser_base.ActionPass"}
company = Team()
company.hire([role_a, role_b])
company.run_project(idea)
await company.run(n_round=4)
logger.info("Team recovered")
new_company = Team.deserialize(stg_path)
new_company.run_project(idea)
assert new_company.env.get_role(role_b.profile)._rc.state == 1
await new_company.run(n_round=4)

View file

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
# @Date : 11/23/2023 10:56 AM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import pytest
from metagpt.actions import WriteCode
from metagpt.llm import LLM
from metagpt.schema import CodingContext, Document
def test_write_design_serialize():
action = WriteCode()
ser_action_dict = action.dict()
assert ser_action_dict["name"] == "WriteCode"
# assert "llm" in ser_action_dict # not export
@pytest.mark.asyncio
async def test_write_code_deserialize():
context = CodingContext(
filename="test_code.py", design_doc=Document(content="write add function to calculate two numbers")
)
doc = Document(content=context.json())
action = WriteCode(context=doc)
serialized_data = action.dict()
new_action = WriteCode(**serialized_data)
assert new_action.name == "WriteCode"
assert new_action.llm == LLM()
await action.run()

View file

@ -0,0 +1,32 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : unittest of WriteCodeReview SerDeser
import pytest
from metagpt.actions import WriteCodeReview
from metagpt.llm import LLM
from metagpt.schema import CodingContext, Document
@pytest.mark.asyncio
async def test_write_code_review_deserialize():
code_content = """
def div(a: int, b: int = 0):
return a / b
"""
context = CodingContext(
filename="test_op.py",
design_doc=Document(content="divide two numbers"),
code_doc=Document(content=code_content),
)
action = WriteCodeReview(context=context)
serialized_data = action.dict()
assert serialized_data["name"] == "WriteCodeReview"
new_action = WriteCodeReview(**serialized_data)
assert new_action.name == "WriteCodeReview"
assert new_action.llm == LLM()
await new_action.run()

View file

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
# @Date : 11/22/2023 8:19 PM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import pytest
from metagpt.actions import WriteDesign, WriteTasks
from metagpt.llm import LLM
def test_write_design_serialize():
action = WriteDesign()
ser_action_dict = action.dict()
assert "name" in ser_action_dict
# assert "llm" in ser_action_dict # not export
def test_write_task_serialize():
action = WriteTasks()
ser_action_dict = action.dict()
assert "name" in ser_action_dict
# assert "llm" in ser_action_dict # not export
@pytest.mark.asyncio
async def test_write_design_deserialize():
action = WriteDesign()
serialized_data = action.dict()
new_action = WriteDesign(**serialized_data)
assert new_action.name == ""
assert new_action.llm == LLM()
await new_action.run(with_messages="write a cli snake game")
@pytest.mark.asyncio
async def test_write_task_deserialize():
action = WriteTasks()
serialized_data = action.dict()
new_action = WriteTasks(**serialized_data)
assert new_action.name == "CreateTasks"
assert new_action.llm == LLM()
await new_action.run(with_messages="write a cli snake game")

View file

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# @Date : 11/22/2023 1:47 PM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import pytest
from metagpt.actions import WritePRD
from metagpt.llm import LLM
from metagpt.schema import Message
def test_action_serialize():
action = WritePRD()
ser_action_dict = action.dict()
assert "name" in ser_action_dict
# assert "llm" in ser_action_dict # not export
@pytest.mark.asyncio
async def test_action_deserialize():
action = WritePRD()
serialized_data = action.dict()
new_action = WritePRD(**serialized_data)
assert new_action.name == ""
assert new_action.llm == LLM()
action_output = await new_action.run(with_messages=Message(content="write a cli snake game"))
assert len(action_output.content) > 0

View file

@ -8,6 +8,8 @@
"""
from pathlib import Path
import pytest
from metagpt.actions import UserRequirement
@ -17,6 +19,8 @@ from metagpt.logs import logger
from metagpt.roles import Architect, ProductManager, Role
from metagpt.schema import Message
serdeser_path = Path(__file__).absolute().parent.joinpath("../data/serdeser_storage")
@pytest.fixture
def env():
@ -52,6 +56,7 @@ async def test_publish_and_process_message(env: Environment):
)
env.add_roles([product_manager, architect])
env.publish_message(Message(role="User", content="需要一个基于LLM做总结的搜索引擎", cause_by=UserRequirement))
await env.run(k=2)
logger.info(f"{env.history=}")

View file

@ -23,7 +23,7 @@ def test_all_messages():
UserMessage(test_content),
SystemMessage(test_content),
AIMessage(test_content),
Message(test_content, role="QA"),
Message(content=test_content, role="QA"),
]
for msg in msgs:
assert msg.content == test_content

View file

@ -0,0 +1,342 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/5/11 14:45
@Author : alexanderwu
@File : test_llm.py
"""
import pytest
from metagpt.llm import LLM
CODE_REVIEW_SMALLEST_CONTEXT = """
## game.js
```Code
// game.js
class Game {
constructor() {
this.board = this.createEmptyBoard();
this.score = 0;
this.bestScore = 0;
}
createEmptyBoard() {
const board = [];
for (let i = 0; i < 4; i++) {
board[i] = [0, 0, 0, 0];
}
return board;
}
startGame() {
this.board = this.createEmptyBoard();
this.score = 0;
this.addRandomTile();
this.addRandomTile();
}
addRandomTile() {
let emptyCells = [];
for (let r = 0; r < 4; r++) {
for (let c = 0; c < 4; c++) {
if (this.board[r][c] === 0) {
emptyCells.push({ r, c });
}
}
}
if (emptyCells.length > 0) {
let randomCell = emptyCells[Math.floor(Math.random() * emptyCells.length)];
this.board[randomCell.r][randomCell.c] = Math.random() < 0.9 ? 2 : 4;
}
}
move(direction) {
// This function will handle the logic for moving tiles
// in the specified direction and merging them
// It will also update the score and add a new random tile if the move is successful
// The actual implementation of this function is complex and would require
// a significant amount of code to handle all the cases for moving and merging tiles
// For the purposes of this example, we will not implement the full logic
// Instead, we will just call addRandomTile to simulate a move
this.addRandomTile();
}
getBoard() {
return this.board;
}
getScore() {
return this.score;
}
getBestScore() {
return this.bestScore;
}
setBestScore(score) {
this.bestScore = score;
}
}
```
"""
MOVE_DRAFT = """
## move function draft
```javascript
move(direction) {
let moved = false;
switch (direction) {
case 'up':
for (let c = 0; c < 4; c++) {
for (let r = 1; r < 4; r++) {
if (this.board[r][c] !== 0) {
let row = r;
while (row > 0 && this.board[row - 1][c] === 0) {
this.board[row - 1][c] = this.board[row][c];
this.board[row][c] = 0;
row--;
moved = true;
}
if (row > 0 && this.board[row - 1][c] === this.board[row][c]) {
this.board[row - 1][c] *= 2;
this.board[row][c] = 0;
this.score += this.board[row - 1][c];
moved = true;
}
}
}
}
break;
case 'down':
// Implement logic for moving tiles down
// Similar to the 'up' case but iterating in reverse order
// and checking for merging in the opposite direction
break;
case 'left':
// Implement logic for moving tiles left
// Similar to the 'up' case but iterating over columns first
// and checking for merging in the opposite direction
break;
case 'right':
// Implement logic for moving tiles right
// Similar to the 'up' case but iterating over columns in reverse order
// and checking for merging in the opposite direction
break;
}
if (moved) {
this.addRandomTile();
}
}
```
"""
FUNCTION_TO_MERMAID_CLASS = """
## context
```
class UIDesign(Action):
#Class representing the UI Design action.
def __init__(self, name, context=None, llm=None):
super().__init__(name, context, llm) # 需要调用LLM进一步丰富UI设计的prompt
@parse
def parse_requirement(self, context: str):
#Parse UI Design draft from the context using regex.
pattern = r"## UI Design draft.*?\n(.*?)## Anything UNCLEAR"
return context, pattern
@parse
def parse_ui_elements(self, context: str):
#Parse Selected Elements from the context using regex.
pattern = r"## Selected Elements.*?\n(.*?)## HTML Layout"
return context, pattern
@parse
def parse_css_code(self, context: str):
pattern = r"```css.*?\n(.*?)## Anything UNCLEAR"
return context, pattern
@parse
def parse_html_code(self, context: str):
pattern = r"```html.*?\n(.*?)```"
return context, pattern
async def draw_icons(self, context, *args, **kwargs):
#Draw icons using SDEngine.
engine = SDEngine()
icon_prompts = self.parse_ui_elements(context)
icons = icon_prompts.split("\n")
icons = [s for s in icons if len(s.strip()) > 0]
prompts_batch = []
for icon_prompt in icons:
# fixme: 添加icon lora
prompt = engine.construct_payload(icon_prompt + ".<lora:WZ0710_AW81e-3_30e3b128d64T32_goon0.5>")
prompts_batch.append(prompt)
await engine.run_t2i(prompts_batch)
logger.info("Finish icon design using StableDiffusion API")
async def _save(self, css_content, html_content):
save_dir = CONFIG.workspace_path / "resources" / "codes"
if not os.path.exists(save_dir):
os.makedirs(save_dir, exist_ok=True)
# Save CSS and HTML content to files
css_file_path = save_dir / "ui_design.css"
html_file_path = save_dir / "ui_design.html"
with open(css_file_path, "w") as css_file:
css_file.write(css_content)
with open(html_file_path, "w") as html_file:
html_file.write(html_content)
async def run(self, requirements: list[Message], *args, **kwargs) -> ActionOutput:
#Run the UI Design action.
# fixme: update prompt (根据需求细化prompt
context = requirements[-1].content
ui_design_draft = self.parse_requirement(context=context)
# todo: parse requirements str
prompt = PROMPT_TEMPLATE.format(context=ui_design_draft, format_example=FORMAT_EXAMPLE)
logger.info(prompt)
ui_describe = await self._aask_v1(prompt, "ui_design", OUTPUT_MAPPING)
logger.info(ui_describe.content)
logger.info(ui_describe.instruct_content)
css = self.parse_css_code(context=ui_describe.content)
html = self.parse_html_code(context=ui_describe.content)
await self._save(css_content=css, html_content=html)
await self.draw_icons(ui_describe.content)
return ui_describe
```
-----
## format example
[CONTENT]
{
"ClassView": "classDiagram\n class A {\n -int x\n +int y\n -int speed\n -int direction\n +__init__(x: int, y: int, speed: int, direction: int)\n +change_direction(new_direction: int) None\n +move() None\n }\n "
}
[/CONTENT]
## nodes: "<node>: <type> # <comment>"
- ClassView: <class 'str'> # Generate the mermaid class diagram corresponding to source code in "context."
## constraint
- Language: Please use the same language as the user input.
- Format: output wrapped inside [CONTENT][/CONTENT] as format example, nothing else.
## action
Fill in the above nodes(ClassView) based on the format example.
"""
MOVE_FUNCTION = """
## move function implementation
```javascript
move(direction) {
let moved = false;
switch (direction) {
case 'up':
for (let c = 0; c < 4; c++) {
for (let r = 1; r < 4; r++) {
if (this.board[r][c] !== 0) {
let row = r;
while (row > 0 && this.board[row - 1][c] === 0) {
this.board[row - 1][c] = this.board[row][c];
this.board[row][c] = 0;
row--;
moved = true;
}
if (row > 0 && this.board[row - 1][c] === this.board[row][c]) {
this.board[row - 1][c] *= 2;
this.board[row][c] = 0;
this.score += this.board[row - 1][c];
moved = true;
}
}
}
}
break;
case 'down':
for (let c = 0; c < 4; c++) {
for (let r = 2; r >= 0; r--) {
if (this.board[r][c] !== 0) {
let row = r;
while (row < 3 && this.board[row + 1][c] === 0) {
this.board[row + 1][c] = this.board[row][c];
this.board[row][c] = 0;
row++;
moved = true;
}
if (row < 3 && this.board[row + 1][c] === this.board[row][c]) {
this.board[row + 1][c] *= 2;
this.board[row][c] = 0;
this.score += this.board[row + 1][c];
moved = true;
}
}
}
}
break;
case 'left':
for (let r = 0; r < 4; r++) {
for (let c = 1; c < 4; c++) {
if (this.board[r][c] !== 0) {
let col = c;
while (col > 0 && this.board[r][col - 1] === 0) {
this.board[r][col - 1] = this.board[r][col];
this.board[r][col] = 0;
col--;
moved = true;
}
if (col > 0 && this.board[r][col - 1] === this.board[r][col]) {
this.board[r][col - 1] *= 2;
this.board[r][col] = 0;
this.score += this.board[r][col - 1];
moved = true;
}
}
}
}
break;
case 'right':
for (let r = 0; r < 4; r++) {
for (let c = 2; c >= 0; c--) {
if (this.board[r][c] !== 0) {
let col = c;
while (col < 3 && this.board[r][col + 1] === 0) {
this.board[r][col + 1] = this.board[r][col];
this.board[r][col] = 0;
col++;
moved = true;
}
if (col < 3 && this.board[r][col + 1] === this.board[r][col]) {
this.board[r][col + 1] *= 2;
this.board[r][col] = 0;
this.score += this.board[r][col + 1];
moved = true;
}
}
}
}
break;
}
if (moved) {
this.addRandomTile();
}
}
```
"""
@pytest.fixture()
def llm():
return LLM()
@pytest.mark.asyncio
async def test_llm_code_review(llm):
choices = [
"Please review the move function code above. Should it be refactor?",
"Please implement the move function",
"Please write a draft for the move function in order to implement it",
]
# prompt = CODE_REVIEW_SMALLEST_CONTEXT+ "\n\n" + MOVE_DRAFT + "\n\n" + choices[1]
# rsp = await llm.aask(prompt)
prompt = CODE_REVIEW_SMALLEST_CONTEXT + "\n\n" + MOVE_FUNCTION + "\n\n" + choices[0]
prompt = FUNCTION_TO_MERMAID_CLASS
_ = await llm.aask(prompt)
# if __name__ == "__main__":
# pytest.main([__file__, "-s"])

View file

@ -7,11 +7,12 @@
@Modified By: mashenquan, 2023-11-1. In line with Chapter 2.2.1 and 2.2.2 of RFC 116, introduce unit tests for
the utilization of the new feature of `Message` class.
"""
import json
import pytest
from metagpt.actions import Action
from metagpt.actions.action_node import ActionNode
from metagpt.actions.write_code import WriteCode
from metagpt.schema import AIMessage, Message, SystemMessage, UserMessage
from metagpt.utils.common import any_to_str
@ -19,10 +20,10 @@ from metagpt.utils.common import any_to_str
def test_messages():
test_content = "test_message"
msgs = [
UserMessage(test_content),
SystemMessage(test_content),
AIMessage(test_content),
Message(test_content, role="QA"),
UserMessage(content=test_content),
SystemMessage(content=test_content),
AIMessage(content=test_content),
Message(content=test_content, role="QA"),
]
text = str(msgs)
roles = ["user", "system", "assistant", "QA"]
@ -30,7 +31,7 @@ def test_messages():
def test_message():
m = Message("a", role="v1")
m = Message(content="a", role="v1")
v = m.dump()
d = json.loads(v)
assert d
@ -43,7 +44,7 @@ def test_message():
assert m.content == "a"
assert m.role == "v2"
m = Message("a", role="b", cause_by="c", x="d", send_to="c")
m = Message(content="a", role="b", cause_by="c", x="d", send_to="c")
assert m.content == "a"
assert m.role == "b"
assert m.send_to == {"c"}
@ -60,12 +61,35 @@ def test_message():
def test_routes():
m = Message("a", role="b", cause_by="c", x="d", send_to="c")
m = Message(content="a", role="b", cause_by="c", x="d", send_to="c")
m.send_to = "b"
assert m.send_to == {"b"}
m.send_to = {"e", Action}
assert m.send_to == {"e", any_to_str(Action)}
if __name__ == "__main__":
pytest.main([__file__, "-s"])
def test_message_serdeser():
out_mapping = {"field3": (str, ...), "field4": (list[str], ...)}
out_data = {"field3": "field3 value3", "field4": ["field4 value1", "field4 value2"]}
ic_obj = ActionNode.create_model_class("code", out_mapping)
message = Message(content="code", instruct_content=ic_obj(**out_data), role="engineer", cause_by=WriteCode)
message_dict = message.dict()
assert message_dict["cause_by"] == "metagpt.actions.write_code.WriteCode"
assert message_dict["instruct_content"] == {
"class": "code",
"mapping": {"field3": "(<class 'str'>, Ellipsis)", "field4": "(list[str], Ellipsis)"},
"value": {"field3": "field3 value3", "field4": ["field4 value1", "field4 value2"]},
}
new_message = Message(**message_dict)
assert new_message.content == message.content
assert new_message.instruct_content == message.instruct_content
assert new_message.cause_by == message.cause_by
assert new_message.instruct_content.field3 == out_data["field3"]
message = Message(content="code")
message_dict = message.dict()
new_message = Message(**message_dict)
assert new_message.instruct_content is None
assert new_message.cause_by == "metagpt.actions.add_requirement.UserRequirement"

View file

@ -13,12 +13,12 @@ async def test_subscription_run():
async def trigger():
while True:
yield Message("the latest news about OpenAI")
yield Message(content="the latest news about OpenAI")
await asyncio.sleep(3600 * 24)
class MockRole(Role):
async def run(self, message=None):
return Message("")
return Message(content="")
async def callback(message):
nonlocal callback_done
@ -61,11 +61,11 @@ async def test_subscription_run():
async def test_subscription_run_error(loguru_caplog):
async def trigger1():
while True:
yield Message("the latest news about OpenAI")
yield Message(content="the latest news about OpenAI")
await asyncio.sleep(3600 * 24)
async def trigger2():
yield Message("the latest news about OpenAI")
yield Message(content="the latest news about OpenAI")
class MockRole1(Role):
async def run(self, message=None):
@ -73,7 +73,7 @@ async def test_subscription_run_error(loguru_caplog):
class MockRole2(Role):
async def run(self, message=None):
return Message("")
return Message(content="")
async def callback(msg: Message):
print(msg)

View file

@ -0,0 +1,13 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : unittest of team
from metagpt.roles.project_manager import ProjectManager
from metagpt.team import Team
def test_team():
company = Team()
company.hire([ProjectManager()])
assert len(company.environment.roles) == 1

View file

@ -47,7 +47,7 @@ class TestGetProjectRoot:
Input(x=RunCode, want="metagpt.actions.run_code.RunCode"),
Input(x=RunCode(), want="metagpt.actions.run_code.RunCode"),
Input(x=Message, want="metagpt.schema.Message"),
Input(x=Message(""), want="metagpt.schema.Message"),
Input(x=Message(content=""), want="metagpt.schema.Message"),
Input(x="A", want="A"),
]
for i in inputs:

View file

@ -7,7 +7,7 @@
from typing import List, Tuple
from metagpt.actions import WritePRD
from metagpt.actions.action_output import ActionOutput
from metagpt.actions.action_node import ActionNode
from metagpt.schema import Message
from metagpt.utils.serialize import (
actionoutout_schema_to_mapping,
@ -54,7 +54,7 @@ def test_actionoutout_schema_to_mapping():
def test_serialize_and_deserialize_message():
out_mapping = {"field1": (str, ...), "field2": (List[str], ...)}
out_data = {"field1": "field1 value", "field2": ["field2 value1", "field2 value2"]}
ic_obj = ActionOutput.create_model_class("prd", out_mapping)
ic_obj = ActionNode.create_model_class("prd", out_mapping)
message = Message(
content="prd demand", instruct_content=ic_obj(**out_data), role="user", cause_by=WritePRD