Merge pull request #520 from better629/serialization

Serialization
This commit is contained in:
Sirui Hong 2023-12-02 00:09:09 +08:00 committed by GitHub
commit a69be36abf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
45 changed files with 972 additions and 574 deletions

View file

@ -5,8 +5,9 @@
@Author : alexanderwu
@File : action.py
"""
from __future__ import annotations
import re
from abc import ABC
from typing import Optional, Any
from pydantic import BaseModel, Field
@ -14,25 +15,49 @@ from tenacity import retry, stop_after_attempt, wait_fixed
from metagpt.actions.action_output import ActionOutput
from metagpt.llm import LLM
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.logs import logger
from metagpt.utils.common import OutputParser
from metagpt.utils.custom_decoder import CustomDecoder
from metagpt.utils.utils import import_class
action_subclass_registry = {}
class Action(BaseModel):
name: str = ""
llm: LLM = Field(default_factory=LLM)
llm: BaseGPTAPI = Field(default_factory=LLM, exclude=True)
context = ""
prefix = ""
profile = ""
desc = ""
content: Optional[str] = None
instruct_content: Optional[str] = None
# builtin variables
builtin_class_name: str = ""
class Config:
arbitrary_types_allowed = True
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
# deserialize child classes dynamically for inherited `action`
object.__setattr__(self, "builtin_class_name", self.__class__.__name__)
self.__fields__["builtin_class_name"].default = self.__class__.__name__
def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
action_subclass_registry[cls.__name__] = cls
def dict(self, *args, **kwargs) -> "DictStrAny":
obj_dict = super(Action, self).dict(*args, **kwargs)
if "llm" in obj_dict:
obj_dict.pop("llm")
return obj_dict
def set_prefix(self, prefix, profile):
"""Set prefix for later usage"""
self.prefix = prefix
@ -44,22 +69,8 @@ class Action(BaseModel):
def __repr__(self):
return self.__str__()
def serialize(self):
return {
"action_class": self.__class__.__name__,
"module_name": self.__module__,
"name": self.name
}
@classmethod
def deserialize(cls, action_dict: dict):
action_class_str = action_dict.pop("action_class")
module_name = action_dict.pop("module_name")
action_class = import_class(action_class_str, module_name)
return action_class(**action_dict)
@classmethod
def ser_class(cls):
def ser_class(cls) -> dict:
""" serialize class type"""
return {
"action_class": cls.__name__,

View file

@ -13,6 +13,7 @@ from pydantic import Field
from metagpt.actions import Action, ActionOutput
from metagpt.llm import LLM
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.config import CONFIG
from metagpt.const import WORKSPACE_ROOT
from metagpt.logs import logger
@ -155,12 +156,11 @@ OUTPUT_MAPPING = {
class WriteDesign(Action):
name: str = ""
context: Optional[str] = None
llm: LLM = Field(default_factory=LLM)
llm: BaseGPTAPI = Field(default_factory=LLM)
desc: str = "Based on the PRD, think about the system design, and design the corresponding APIs, "
"data structures, library tables, processes, and paths. Please provide your design, feedback "
"clearly and in detail."
def recreate_workspace(self, workspace: Path):
try:
shutil.rmtree(workspace)
@ -168,7 +168,6 @@ class WriteDesign(Action):
pass # Folder does not exist, but we don't care
workspace.mkdir(parents=True, exist_ok=True)
async def _save_prd(self, docs_path, resources_path, context):
prd_file = docs_path / "prd.md"
if context[-1].instruct_content and context[-1].instruct_content.dict()["Competitive Quadrant Chart"]:
@ -179,7 +178,6 @@ class WriteDesign(Action):
logger.info(f"Saving PRD to {prd_file}")
prd_file.write_text(json_to_markdown(context[-1].instruct_content.dict()))
async def _save_system_design(self, docs_path, resources_path, system_design):
data_api_design = system_design.instruct_content.dict()[
"Data structures and interface definitions"
@ -193,7 +191,6 @@ class WriteDesign(Action):
logger.info(f"Saving System Designs to {system_design_file}")
system_design_file.write_text((json_to_markdown(system_design.instruct_content.dict())))
async def _save(self, context, system_design):
if isinstance(system_design, ActionOutput):
ws_name = system_design.instruct_content.dict()["Python package name"]
@ -211,7 +208,6 @@ class WriteDesign(Action):
logger.error(f"Failed to save PRD {e}")
await self._save_system_design(docs_path, resources_path, system_design)
async def run(self, context, format=CONFIG.prompt_format):
prompt_template, format_example = get_template(templates, format)
prompt = prompt_template.format(context=context, format_example=format_example)

View file

@ -11,6 +11,7 @@ from pydantic import Field
from metagpt.actions.action import Action
from metagpt.llm import LLM
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.config import CONFIG
from metagpt.const import WORKSPACE_ROOT
from metagpt.utils.common import CodeParser
@ -168,7 +169,7 @@ OUTPUT_MAPPING = {
class WriteTasks(Action):
name: str = "CreateTasks"
context: Optional[str] = None
llm: LLM = Field(default_factory=LLM)
llm: BaseGPTAPI = Field(default_factory=LLM)
def _save(self, context, rsp):
try:

View file

@ -8,14 +8,15 @@
import pydantic
from typing import Optional, Any
from pydantic import BaseModel, Field
from pydantic import root_validator
from metagpt.actions import Action
from metagpt.llm import LLM
from metagpt.config import Config
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.config import Config, CONFIG
from metagpt.logs import logger
from metagpt.schema import Message
from metagpt.tools.search_engine import SearchEngine
from pydantic import root_validator
SEARCH_AND_SUMMARIZE_SYSTEM = """### Requirements
1. Please summarize the latest dialogue based on the reference information (secondary) and dialogue history (primary). Do not include text that is irrelevant to the conversation.
@ -106,35 +107,31 @@ You are a member of a professional butler team and will provide helpful suggesti
class SearchAndSummarize(Action):
name: str = ""
content: Optional[str] = None
llm: None = Field(default_factory=LLM)
llm: BaseGPTAPI = Field(default_factory=LLM)
config: None = Field(default_factory=Config)
engine: Optional[str] = None
engine: Optional[str] = CONFIG.search_engine
search_func: Optional[str] = None
search_engine: SearchEngine = None
result = ""
@root_validator
def validate_engine_and_run_func(cls, values):
engine = values.get('engine')
search_func = values.get('search_func')
engine = values.get("engine")
search_func = values.get("search_func")
config = Config()
if engine is None:
engine = config.search_engine
config_data = {
'engine': engine,
'run_func': search_func
}
search_engine = SearchEngine(**config_data)
try:
search_engine = SearchEngine(engine=engine, run_func=search_func)
except pydantic.ValidationError:
search_engine = None
values['search_engine'] = search_engine
values["search_engine"] = search_engine
return values
async def run(self, context: list[Message], system_text=SEARCH_AND_SUMMARIZE_SYSTEM) -> str:
print(context)
if self.search_engine is None:
logger.warning("Configure one of SERPAPI_API_KEY, SERPER_API_KEY, GOOGLE_API_KEY to unlock full feature")
return ""

View file

@ -13,6 +13,7 @@ from tenacity import retry, stop_after_attempt, wait_fixed
from metagpt.actions import WriteDesign
from metagpt.actions.action import Action
from metagpt.llm import LLM
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.const import WORKSPACE_ROOT
from metagpt.logs import logger
from metagpt.schema import Message
@ -50,7 +51,7 @@ ATTENTION: Use '##' to SPLIT SECTIONS, not '#'. Output format carefully referenc
class WriteCode(Action):
name: str = "WriteCode"
context: Optional[str] = None
llm: LLM = Field(default_factory=LLM)
llm: BaseGPTAPI = Field(default_factory=LLM)
def _is_invalid(self, filename):
return any(i in filename for i in ["mp3", "wav"])

View file

@ -12,7 +12,7 @@ from tenacity import retry, stop_after_attempt, wait_fixed
from metagpt.llm import LLM
from metagpt.actions.action import Action
from metagpt.logs import logger
from metagpt.schema import Message
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.utils.common import CodeParser
PROMPT_TEMPLATE = """
@ -67,7 +67,7 @@ FORMAT_EXAMPLE = """
class WriteCodeReview(Action):
name: str = "WriteCodeReview"
context: Optional[str] = None
llm: LLM = Field(default_factory=LLM)
llm: BaseGPTAPI = Field(default_factory=LLM)
@retry(stop=stop_after_attempt(2), wait=wait_fixed(1))
async def write_code(self, prompt):

View file

@ -11,6 +11,7 @@ from pydantic import BaseModel, Field
from metagpt.actions import Action, ActionOutput
from metagpt.llm import LLM
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.actions.search_and_summarize import SearchAndSummarize
from metagpt.config import CONFIG
from metagpt.logs import logger
@ -224,21 +225,15 @@ OUTPUT_MAPPING = {
class WritePRD(Action):
name: str = ""
content: Optional[str] = None
llm: LLM = Field(default_factory=LLM)
assistant_search_action: Action = None
def __init__(self, **kwargs):
super().__init__(**kwargs)
llm: BaseGPTAPI = Field(default_factory=LLM)
async def run(self, requirements, format=CONFIG.prompt_format, *args, **kwargs) -> ActionOutput:
# self.assistant_search_action = SearchAndSummarize()
if self.assistant_search_action is None:
self.assistant_search_action = SearchAndSummarize()
# self.assistant_search_action = SearchAndSummarize()
rsp = await self.assistant_search_action.run(context=requirements)
info = f"### Search Results\n{self.assistant_search_action.result}\n\n### Search Summary\n{rsp}"
if self.assistant_search_action.result:
logger.info(self.assistant_search_action.result)
sas = SearchAndSummarize()
# rsp = await sas.run(context=requirements, system_text=SEARCH_AND_SUMMARIZE_SYSTEM_EN_US)
rsp = ""
info = f"### Search Results\n{sas.result}\n\n### Search Summary\n{rsp}"
if sas.result:
logger.info(sas.result)
logger.info(rsp)
prompt_template, format_example = get_template(templates, format)

View file

@ -5,6 +5,12 @@
@Author : alexanderwu
@File : environment.py
"""
from typing import Optional
from pydantic import Field
from metagpt.llm import LLM
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.actions.action import Action
from metagpt.logs import logger
from metagpt.utils.common import CodeParser
@ -31,8 +37,9 @@ you should correctly import the necessary classes based on these file locations!
class WriteTest(Action):
def __init__(self, name="WriteTest", context=None, llm=None):
super().__init__(name, context, llm)
name: str = "WriteTest"
context: Optional[str] = None
llm: BaseGPTAPI = Field(default_factory=LLM)
async def write_code(self, prompt):
code_rsp = await self._aask(prompt)

View file

@ -42,7 +42,7 @@ TMP = PROJECT_ROOT / "tmp"
RESEARCH_PATH = DATA_PATH / "research"
TUTORIAL_PATH = DATA_PATH / "tutorial_docx"
INVOICE_OCR_TABLE_PATH = DATA_PATH / "invoice_table"
SERDES_PATH = WORKSPACE_ROOT / "storage" # TODO to store `storage` under the individual generated project
SERDESER_PATH = WORKSPACE_ROOT / "storage" # TODO to store `storage` under the individual generated project
SKILL_DIRECTORY = PROJECT_ROOT / "metagpt/skills"

View file

@ -12,7 +12,7 @@ from pathlib import Path
from pydantic import BaseModel, Field
from metagpt.memory import Memory
from metagpt.roles import Role
from metagpt.roles.role import Role, role_subclass_registry
from metagpt.schema import Message
from metagpt.utils.utils import read_json_file, write_json_file
@ -30,6 +30,23 @@ class Environment(BaseModel):
class Config:
arbitrary_types_allowed = True
def __init__(self, **kwargs):
roles = []
for role_key, role in kwargs.get("roles", {}).items():
current_role = kwargs["roles"][role_key]
if isinstance(current_role, dict):
item_class_name = current_role.get("builtin_class_name", None)
for name, subclass in role_subclass_registry.items():
registery_class_name = subclass.__fields__["builtin_class_name"].default
if item_class_name == registery_class_name:
current_role = subclass(**current_role)
break
kwargs["roles"][role_key] = current_role
roles.append(current_role)
super().__init__(**kwargs)
self.add_roles(roles) # add_roles again to init the Role.set_env
def serialize(self, stg_path: Path):
roles_path = stg_path.joinpath("roles.json")
roles_info = []
@ -46,33 +63,37 @@ class Environment(BaseModel):
history_path = stg_path.joinpath("history.json")
write_json_file(history_path, {"content": self.history})
def deserialize(self, stg_path: Path):
@classmethod
def deserialize(cls, stg_path: Path) -> "Environment":
""" stg_path: ./storage/team/environment/ """
roles_path = stg_path.joinpath("roles.json")
roles_info = read_json_file(roles_path)
roles = []
for role_info in roles_info:
role_class = role_info.get("role_class")
role_name = role_info.get("role_name")
role_path = stg_path.joinpath(f"roles/{role_class}_{role_name}")
# role stored in ./environment/roles/{role_class}_{role_name}
role_path = stg_path.joinpath(f'roles/{role_info.get("role_class")}_{role_info.get("role_name")}')
role = Role.deserialize(role_path)
self.add_role(role)
roles.append(role)
memory = Memory.deserialize(stg_path)
self.memory = memory
history_path = stg_path.joinpath("history.json")
history = read_json_file(history_path)
self.history = history.get("content")
history = read_json_file(stg_path.joinpath("history.json"))
history = history.get("content")
environment = Environment(**{
"memory": memory,
"history": history
})
environment.add_roles(roles)
return environment
def add_role(self, role: Role):
"""增加一个在当前环境的角色, 默认为profile/role_profile
"""增加一个在当前环境的角色, 默认为profile
Add a role in the current environment
"""
role.set_env(self)
# use alias
self.roles[role.role_profile] = role
self.roles[role.profile] = role
def add_roles(self, roles: Iterable[Role]):
"""增加一批在当前环境的角色

View file

@ -2,6 +2,9 @@
# -*- coding: utf-8 -*-
# @Desc : the implement of Long-term memory
from typing import Optional
from pydantic import Field
from metagpt.logs import logger
from metagpt.memory import Memory
from metagpt.memory.memory_storage import MemoryStorage
@ -15,11 +18,12 @@ class LongTermMemory(Memory):
- update memory when it changed
"""
def __init__(self):
self.memory_storage: MemoryStorage = MemoryStorage()
super(LongTermMemory, self).__init__()
self.rc = None # RoleContext
self.msg_from_recover = False
memory_storage: MemoryStorage = Field(default_factory=MemoryStorage)
rc: Optional["RoleContext"] = None
msg_from_recover: bool = False
class Config:
arbitrary_types_allowed = True
def recover_memory(self, role_id: str, rc: "RoleContext"):
messages = self.memory_storage.recover_memory(role_id)

View file

@ -5,34 +5,51 @@
@Author : alexanderwu
@File : memory.py
"""
import copy
from collections import defaultdict
from typing import Iterable, Type
from typing import Iterable, Type, Union, Optional
from pathlib import Path
from pydantic import BaseModel, Field
import json
from metagpt.actions import Action
from metagpt.schema import Message
from metagpt.utils.utils import read_json_file, write_json_file
from metagpt.utils.serialize import serialize_general_message, deserialize_general_message
from metagpt.utils.utils import import_class
class Memory:
class Memory(BaseModel):
"""The most basic memory: super-memory"""
def __init__(self):
"""Initialize an empty storage list and an empty index dictionary"""
self.storage: list[Message] = []
self.index: dict[Type[Action], list[Message]] = defaultdict(list)
storage: list[Message] = Field(default=[])
index: dict[Type[Action], list[Message]] = Field(default_factory=defaultdict(list))
def __init__(self, **kwargs):
index = kwargs.get("index", {})
new_index = defaultdict(list)
for action_str, value in index.items():
action_dict = json.loads(action_str)
action_class = import_class("Action", "metagpt.actions.action")
action_obj = action_class.deser_class(action_dict)
new_index[action_obj] = [Message(**item_dict) for item_dict in value]
kwargs["index"] = new_index
super(Memory, self).__init__(**kwargs)
self.index = new_index
def dict(self, *args, **kwargs) -> "DictStrAny":
""" overwrite the `dict` to dump dynamic pydantic model"""
obj_dict = super(Memory, self).dict(*args, **kwargs)
new_obj_dict = copy.deepcopy(obj_dict)
new_obj_dict["index"] = {}
for action, value in obj_dict["index"].items():
action_ser = json.dumps(action.ser_class())
new_obj_dict["index"][action_ser] = value
return new_obj_dict
def serialize(self, stg_path: Path):
""" stg_path = ./storage/team/environment/ or ./storage/team/environment/roles/{role_class}_{role_name}/ """
memory_path = stg_path.joinpath("memory.json")
storage = []
for message in self.storage:
# msg_dict = message.serialize()
msg_dict = serialize_general_message(message)
storage.append(msg_dict)
storage = self.dict()
write_json_file(memory_path, storage)
@classmethod
@ -40,13 +57,8 @@ class Memory:
""" stg_path = ./storage/team/environment/ or ./storage/team/environment/roles/{role_class}_{role_name}/"""
memory_path = stg_path.joinpath("memory.json")
memory = Memory()
memory_list = read_json_file(memory_path)
for message in memory_list:
# distinguish instruct_content type in message
# msg = Message.deserialize(message)
msg = deserialize_general_message(message)
memory.add(msg)
memory_dict = read_json_file(memory_path)
memory = Memory(**memory_dict)
return memory
@ -70,6 +82,16 @@ class Memory:
"""Return all messages containing a specified content"""
return [message for message in self.storage if content in message.content]
def delete_newest(self) -> "Message":
""" delete the newest message from the storage"""
if len(self.storage) > 0:
newest_msg = self.storage.pop()
if newest_msg.cause_by and newest_msg in self.index[newest_msg.cause_by]:
self.index[newest_msg.cause_by].remove(newest_msg)
else:
newest_msg = None
return newest_msg
def delete(self, message: Message):
"""Delete the specified message from storage, while updating the index"""
self.storage.remove(message)
@ -115,4 +137,3 @@ class Memory:
continue
rsp += self.index[action]
return rsp

View file

@ -22,8 +22,8 @@ class Architect(Role):
goal (str): Primary goal or responsibility of the architect.
constraints (str): Constraints or guidelines for the architect.
"""
name: str = "Bob"
role_profile: str = Field(default="Architect" , alias='profile')
name: str = Field(default="Bob")
profile: str = Field(default="Architect")
goal: str = "Design a concise, usable, complete python system"
constraints: str = "Try to specify good open source tools as much as possible"

View file

@ -5,6 +5,9 @@
@Author : alexanderwu
@File : sales.py
"""
from typing import Optional
from pydantic import Field
from metagpt.roles import Sales
# from metagpt.actions import SearchAndSummarize
@ -24,12 +27,14 @@ DESC = """
class CustomerService(Sales):
name: str = Field(default="Xiaomei")
profile: str = Field(default="Human customer service")
desc: str = DESC,
store: Optional[str] = None
def __init__(
self,
name="Xiaomei",
profile="Human customer service",
desc=DESC,
store=None
):
super().__init__(name, profile, desc=desc, store=store)
**kwargs):
super().__init__(**kwargs)

View file

@ -60,8 +60,8 @@ class Engineer(Role):
use_code_review (bool): Whether to use code review.
todos (list): List of tasks.
"""
name: str = "Alex"
role_profile: str = Field(default="Engineer", alias='profile')
name: str = Field(default="Alex")
profile: str = Field(default="Engineer")
goal: str = "Write elegant, readable, extensible, efficient code"
constraints: str = "The code should conform to standards like PEP8 and be modular and maintainable"
n_borg: int = 1

View file

@ -21,10 +21,11 @@ class ProductManager(Role):
goal (str): Goal of the product manager.
constraints (str): Constraints or limitations for the product manager.
"""
name: str = "Alice"
role_profile: str = Field(default="Product Manager", alias='profile')
name: str = Field(default="Alice")
profile: str = Field(default="Product Manager")
goal: str = "Efficiently create a successful product"
constraints: str = ""
"""
Represents a Product Manager role responsible for product development and management.
"""

View file

@ -22,8 +22,8 @@ class ProjectManager(Role):
goal (str): Goal of the project manager.
constraints (str): Constraints or limitations for the project manager.
"""
name: str = "Eve"
role_profile: str = Field(default="Project Manager", alias='profile')
name: str = Field(default="Eve")
profile: str = Field(default="Project Manager")
goal: str = "Improve team efficiency and deliver with quality and quantity"
constraints: str = ""

View file

@ -7,6 +7,7 @@
"""
import os
from pathlib import Path
from pydantic import Field
from metagpt.actions import (
DebugError,
@ -25,21 +26,22 @@ from metagpt.utils.special_tokens import FILENAME_CODE_SEP, MSG_SEP
class QaEngineer(Role):
name: str = Field(default="Edward")
profile: str = Field(default="QaEngineer")
goal: str = "Write comprehensive and robust tests to ensure codes will work as expected without bugs"
constraints: str = "The test code you write should conform to code standard like PEP8, be modular, easy to read and maintain"
test_round_allowed: int = 5
def __init__(
self,
name="Edward",
profile="QaEngineer",
goal="Write comprehensive and robust tests to ensure codes will work as expected without bugs",
constraints="The test code you write should conform to code standard like PEP8, be modular, easy to read and maintain",
test_round_allowed=5,
**kwargs
):
super().__init__(name, profile, goal, constraints)
super().__init__(**kwargs)
self._init_actions(
[WriteTest]
) # FIXME: a bit hack here, only init one action to circumvent _think() logic, will overwrite _think() in future updates
self._watch([WriteCode, WriteCodeReview, WriteTest, RunCode, DebugError])
self.test_round = 0
self.test_round_allowed = test_round_allowed
@classmethod
def parse_workspace(cls, system_design_msg: Message) -> str:

View file

@ -6,27 +6,29 @@
@File : role.py
"""
from __future__ import annotations
from enum import Enum
from pathlib import Path
from __future__ import annotations
from typing import (
Iterable,
Type
Type,
Any
)
import re
from pydantic import BaseModel, Field
from importlib import import_module
from pydantic import BaseModel, Field, validator
# from metagpt.environment import Environment
from metagpt.config import CONFIG
from metagpt.actions import Action, ActionOutput
from metagpt.actions.action import Action, ActionOutput, action_subclass_registry
from metagpt.llm import LLM
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.logs import logger
from metagpt.memory import Memory, LongTermMemory
from metagpt.schema import Message
from metagpt.provider.human_provider import HumanProvider
from metagpt.utils.utils import read_json_file, write_json_file, import_class
from metagpt.utils.utils import read_json_file, write_json_file, import_class, role_raise_decorator
from metagpt.const import SERDESER_PATH
PREFIX_TEMPLATE = """You are a {profile}, named {name}, your goal is {goal}, and the constraint is {constraints}. """
@ -57,6 +59,7 @@ ROLE_TEMPLATE = """Your response should be based on the previous conversation hi
{name}: {result}
"""
class RoleReactMode(str, Enum):
REACT = "react"
BY_ORDER = "by_order"
@ -74,6 +77,7 @@ class RoleSetting(BaseModel):
goal: str = ""
constraints: str = ""
desc: str = ""
is_human: bool = False
def __str__(self):
return f"{self.name}({self.profile})"
@ -84,19 +88,38 @@ class RoleSetting(BaseModel):
class RoleContext(BaseModel):
"""Role Runtime Context"""
env: 'Environment' = Field(default=None)
# # env exclude=True to avoid `RecursionError: maximum recursion depth exceeded in comparison`
env: "Environment" = Field(default=None, exclude=True)
memory: Memory = Field(default_factory=Memory)
long_term_memory: LongTermMemory = Field(default_factory=LongTermMemory)
state: int = Field(default=0)
todo: Action = Field(default=None)
long_term_memory: LongTermMemory = Field(default_factory=LongTermMemory, exclude=True) # TODO not used now
state: int = Field(default=-1) # -1 indicates initial or termination state where todo is None
todo: Action = Field(default=None, exclude=True)
watch: set[Type[Action]] = Field(default_factory=set)
news: list[Type[Message]] = Field(default=[])
news: list[Type[Message]] = Field(default=[], exclude=True) # TODO not used
react_mode: RoleReactMode = RoleReactMode.REACT # see `Role._set_react_mode` for definitions of the following two attributes
max_react_loop: int = 1
class Config:
arbitrary_types_allowed = True
def __init__(self, **kwargs):
watch_info = kwargs.get("watch", set())
watch = set()
for item in watch_info:
action = Action.deser_class(item)
watch.update([action])
kwargs["watch"] = watch
super(RoleContext, self).__init__(**kwargs)
def dict(self, *args, **kwargs) -> "DictStrAny":
obj_dict = super(RoleContext, self).dict(*args, **kwargs)
watch = obj_dict.get("watch", set())
watch_info = []
for item in watch:
watch_info.append(item.ser_class())
obj_dict["watch"] = watch_info
return obj_dict
def check(self, role_id: str):
if hasattr(CONFIG, "long_term_memory") and CONFIG.long_term_memory:
self.long_term_memory.recover_memory(role_id, self)
@ -112,85 +135,102 @@ class RoleContext(BaseModel):
return self.memory.get()
role_subclass_registry = {}
class Role(BaseModel):
"""Role/Agent"""
name: str = ""
profile: str = ""
goal: str = ""
constraints: str = ""
desc: str = ""
_setting: RoleSetting = Field(default_factory=RoleSetting, alias="_setting")
_setting = RoleSetting(name=name, profile=profile, goal=goal, constraints=constraints)
is_human: bool = False
_llm: BaseGPTAPI = Field(default_factory=LLM)
_role_id: str = ""
_states: list = Field(default=[])
_actions: list = Field(default=[])
_actions_type: list = Field(default=[])
_rc: RoleContext = RoleContext()
_states: list[str] = Field(default=[])
_actions: list[Action] = Field(default=[])
_rc: RoleContext = Field(default=RoleContext)
# builtin variables
recovered: bool = False # to tag if a recovered role
builtin_class_name: str = ""
_private_attributes = {
"_setting": _setting,
"_llm": LLM() if not is_human else HumanProvider(),
"_role_id": _role_id,
"_states": [],
"_actions": [],
"_actions_type": [] # 用于记录和序列化
"_rc": RoleContext()
}
class Config:
arbitrary_types_allowed = True
def __init__(self, **kwargs):
exclude = ["_llm"]
def __init__(self, **kwargs: Any):
for index in range(len(kwargs.get("_actions", []))):
current_action = kwargs["_actions"][index]
if isinstance(current_action, dict):
item_class_name = current_action.get("builtin_class_name", None)
for name, subclass in action_subclass_registry.items():
registery_class_name = subclass.__fields__["builtin_class_name"].default
if item_class_name == registery_class_name:
current_action = subclass(**current_action)
break
kwargs["_actions"][index] = current_action
super().__init__(**kwargs)
# 关于私有变量的初始化 https://github.com/pydantic/pydantic/issues/655
self._private_attributes["_llm"] = LLM() if not self.is_human else HumanProvider()
self._private_attributes["_role_id"] = str(self._setting)
for key in self._private_attributes.keys():
if key in kwargs:
object.__setattr__(self, key, kwargs[key])
if key =="_setting":
_setting = RoleSetting(**kwargs[key])
object.__setattr__(self, '_setting', _setting)
elif key == "_rc":
_rc = RoleContext
object.__setattr__(self, '_rc', _rc)
if key == "_rc":
_rc = RoleContext(**kwargs["_rc"])
object.__setattr__(self, "_rc", _rc)
else:
object.__setattr__(self, key, self._private_attributes[key])
if key == "_rc":
# # Warning, if use self._private_attributes["_rc"],
# # self._rc will be a shared object between roles, so init one or reset it inside `_reset`
object.__setattr__(self, key, RoleContext())
else:
object.__setattr__(self, key, self._private_attributes[key])
# deserialize child classes dynamically for inherited `role`
object.__setattr__(self, "builtin_class_name", self.__class__.__name__)
self.__fields__["builtin_class_name"].default = self.__class__.__name__
def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
role_subclass_registry[cls.__name__] = cls
def _reset(self):
object.__setattr__(self, '_states', [])
object.__setattr__(self, '_actions', [])
object.__setattr__(self, "_states", [])
object.__setattr__(self, "_actions", [])
# object.__setattr__(self, "_rc", RoleContext())
def serialize(self, stg_path: Path):
role_info_path = stg_path.joinpath("role_info.json")
role_info = {
@property
def _setting(self):
return f"{self.name}({self.profile})"
def serialize(self, stg_path: Path = None):
stg_path = SERDESER_PATH.joinpath(f"team/environment/roles/{self.__class__.__name__}_{self.name}") \
if stg_path is None else stg_path
role_info = self.dict(exclude={"_rc": {"memory": True}, "_llm": True})
role_info.update({
"role_class": self.__class__.__name__,
"module_name": self.__module__
}
setting = self._setting.dict()
setting.pop("desc")
setting.pop("is_human") # not all inherited roles have this atrr
role_info.update(setting)
})
role_info_path = stg_path.joinpath("role_info.json")
write_json_file(role_info_path, role_info)
actions_info_path = stg_path.joinpath("actions/actions_info.json")
actions_info = []
for action in self._actions:
actions_info.append(action.serialize())
write_json_file(actions_info_path, actions_info)
watches_info_path = stg_path.joinpath("watches/watches_info.json")
watches_info = []
for watch in self._rc.watch:
watches_info.append(watch.ser_class())
write_json_file(watches_info_path, watches_info)
actions_todo_path = stg_path.joinpath("actions/todo.json")
actions_todo = {
"cur_state": self._rc.state,
"react_mode": self._rc.react_mode.value,
"max_react_loop": self._rc.max_react_loop
}
write_json_file(actions_todo_path, actions_todo)
self._rc.memory.serialize(stg_path)
self._rc.memory.serialize(stg_path) # serialize role's memory alone
@classmethod
def deserialize(cls, stg_path: Path) -> "Role":
@ -203,47 +243,15 @@ class Role(BaseModel):
role_class = import_class(class_name=role_class_str, module_name=module_name)
role = role_class(**role_info) # initiate particular Role
actions_info_path = stg_path.joinpath("actions/actions_info.json")
actions = []
actions_info = read_json_file(actions_info_path)
for action_info in actions_info:
action = Action.deserialize(action_info)
actions.append(action)
watches_info_path = stg_path.joinpath("watches/watches_info.json")
watches = []
watches_info = read_json_file(watches_info_path)
for watch_info in watches_info:
action = Action.deser_class(watch_info)
watches.append(action)
role.init_actions(actions)
role.watch(watches)
actions_todo_path = stg_path.joinpath("actions/todo.json")
# recover self._rc.state
actions_todo = read_json_file(actions_todo_path)
max_react_loop = actions_todo.get("max_react_loop", 1)
cur_state = actions_todo.get("cur_state", -1)
role.set_state(cur_state)
role.set_recovered(True)
react_mode_str = actions_todo.get("react_mode", RoleReactMode.REACT.value)
if react_mode_str not in RoleReactMode.values():
logger.warning(f"ReactMode: {react_mode_str} not in {RoleReactMode.values()}, use react as default")
react_mode_str = RoleReactMode.REACT.value
role.set_react_mode(RoleReactMode(react_mode_str), max_react_loop)
role.set_recovered(True) # set True to make a tag
role_memory = Memory.deserialize(stg_path)
role.set_memory(role_memory)
return role
def _reset(self):
self._states = []
self._actions = []
def set_recovered(self, recovered: bool = False):
self._recovered = recovered
self.recovered = recovered
def set_memory(self, memory: Memory):
self._rc.memory = memory
@ -256,17 +264,15 @@ class Role(BaseModel):
for idx, action in enumerate(actions):
if not isinstance(action, Action):
## 默认初始化
i = action("", llm=self._llm)
i = action(llm=self._llm)
else:
if self._setting.is_human and not isinstance(action.llm, HumanProvider):
if self.is_human and not isinstance(action.llm, HumanProvider):
logger.warning(f"is_human attribute does not take effect,"
f"as Role's {str(action)} was initialized using LLM, try passing in Action classes instead of initialized instances")
i = action
i.set_prefix(self._get_prefix(), self.profile)
self._actions.append(i)
self._states.append(f"{idx}. {action}")
action_title = action.schema()["title"]
self._actions_type.append(action_title)
def set_react_mode(self, react_mode: RoleReactMode, max_react_loop: int = 1):
self._set_react_mode(react_mode, max_react_loop)
@ -310,24 +316,20 @@ class Role(BaseModel):
logger.debug(self._actions)
self._rc.todo = self._actions[self._rc.state] if state >= 0 else None
def set_env(self, env: 'Environment'):
def set_env(self, env: "Environment"):
"""Set the environment in which the role works. The role can talk to the environment and can also receive messages by observing."""
self._rc.env = env
@property
def name(self):
return self._setting.name
@property
def profile(self):
"""Get the role description (position)"""
return self._setting.profile
def _get_prefix(self):
"""Get the role prefix"""
if self._setting.desc:
return self._setting.desc
return PREFIX_TEMPLATE.format(**self._setting.dict())
if self.desc:
return self.desc
return PREFIX_TEMPLATE.format(**{
"profile": self.profile,
"name": self.name,
"goal": self.goal,
"constraints": self.constraints
})
async def _think(self) -> None:
"""Think about what to do and decide on the next action"""
@ -335,9 +337,9 @@ class Role(BaseModel):
# If there is only one action, then only this one can be performed
self._set_state(0)
return
if self._recovered and self._rc.state >= 0:
if self.recovered and self._rc.state >= 0:
self._set_state(self._rc.state) # action to run from recovered state
self._recovered = False # avoid max_react_loop out of work
self.recovered = False # avoid max_react_loop out of work
return
prompt = self._get_prefix()
@ -347,7 +349,7 @@ class Role(BaseModel):
logger.debug(f"{prompt=}")
if (not next_state.isdigit() and next_state != "-1") \
or int(next_state) not in range(-1, len(self._states)):
logger.warning(f'Invalid answer of state, {next_state=}, will be set to -1')
logger.warning(f"Invalid answer of state, {next_state=}, will be set to -1")
next_state = -1
else:
next_state = int(next_state)
@ -384,7 +386,7 @@ class Role(BaseModel):
news_text = [f"{i.role}: {i.content[:20]}..." for i in self._rc.news]
if news_text:
logger.debug(f'{self._setting} observed: {news_text}')
logger.debug(f"{self._setting} observed: {news_text}")
return len(self._rc.news)
def _publish_message(self, msg):
@ -400,7 +402,7 @@ class Role(BaseModel):
Use llm to select actions in _think dynamically
"""
actions_taken = 0
rsp = Message("No actions taken yet") # will be overwritten after Role _act
rsp = Message(content="No actions taken yet") # will be overwritten after Role _act
while actions_taken < self._rc.max_react_loop:
# think
await self._think()
@ -410,7 +412,7 @@ class Role(BaseModel):
logger.debug(f"{self._setting}: {self._rc.state=}, will do {self._rc.todo}")
rsp = await self._act()
actions_taken += 1
return rsp # return output from the last action
return rsp # return output from the last action
async def _act_by_order(self) -> Message:
"""switch action each time by order defined in _init_actions, i.e. _act (Action1) -> _act (Action2) -> ..."""
@ -454,7 +456,8 @@ class Role(BaseModel):
def get_memories(self, k=0) -> list[Message]:
"""A wrapper to return the most recent k memories of this role, return all when k=0"""
return self._rc.memory.get(k=k)
@role_raise_decorator
async def run(self, message=None):
"""Observe, and think and act based on the results of the observation"""
if message:

View file

@ -5,26 +5,34 @@
@Author : alexanderwu
@File : sales.py
"""
from typing import Optional
from pydantic import Field
from metagpt.actions import SearchAndSummarize
from metagpt.roles import Role
from metagpt.tools import SearchEngineType
class Sales(Role):
name: str = Field(default="Xiaomei")
profile: str = Field(default="Retail sales guide")
desc: str = "I am a sales guide in retail. My name is Xiaomei. I will answer some customer questions next, and I "
"will answer questions only based on the information in the knowledge base."
"If I feel that you can't get the answer from the reference material, then I will directly reply that"
" I don't know, and I won't tell you that this is from the knowledge base,"
"but pretend to be what I know. Note that each of my replies will be replied in the tone of a "
"professional guide",
store: Optional[str] = None
def __init__(
self,
name="Xiaomei",
profile="Retail sales guide",
desc="I am a sales guide in retail. My name is Xiaomei. I will answer some customer questions next, and I "
"will answer questions only based on the information in the knowledge base."
"If I feel that you can't get the answer from the reference material, then I will directly reply that"
" I don't know, and I won't tell you that this is from the knowledge base,"
"but pretend to be what I know. Note that each of my replies will be replied in the tone of a "
"professional guide",
store=None
**kwargs
):
super().__init__(name, profile, desc=desc)
self._set_store(store)
super().__init__(**kwargs)
self._set_store(self.store)
def _set_store(self, store):
if store:
@ -32,4 +40,3 @@ class Sales(Role):
else:
action = SearchAndSummarize()
self._init_actions([action])

View file

@ -5,6 +5,9 @@
@Author : alexanderwu
@File : seacher.py
"""
from pydantic import Field
from metagpt.actions import ActionOutput, SearchAndSummarize
from metagpt.logs import logger
from metagpt.roles import Role
@ -23,14 +26,14 @@ class Searcher(Role):
constraints (str): Constraints or limitations for the searcher.
engine (SearchEngineType): The type of search engine to use.
"""
name: str = Field(default="Alice")
profile: str = Field(default="Smart Assistant")
goal: str = "Provide search services for users"
constraints: str = "Answer is rich and complete"
engine: SearchEngineType = SearchEngineType.SERPAPI_GOOGLE
def __init__(self,
name: str = 'Alice',
profile: str = 'Smart Assistant',
goal: str = 'Provide search services for users',
constraints: str = 'Answer is rich and complete',
engine=SearchEngineType.SERPAPI_GOOGLE,
**kwargs) -> None:
def __init__(self, **kwargs) -> None:
"""
Initializes the Searcher role with given attributes.
@ -41,8 +44,8 @@ class Searcher(Role):
constraints (str): Constraints or limitations for the searcher.
engine (SearchEngineType): The type of search engine to use.
"""
super().__init__(name, profile, goal, constraints, **kwargs)
self._init_actions([SearchAndSummarize(engine=engine)])
super().__init__(**kwargs)
self._init_actions([SearchAndSummarize(engine=self.engine)])
def set_search_func(self, search_func):
"""Sets a custom search function for the searcher."""

View file

@ -5,18 +5,17 @@
@Author : alexanderwu
@File : schema.py
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Type, TypedDict
import copy
from typing import Type, TypedDict, Union, Optional
from pydantic import BaseModel
from pydantic import BaseModel, Field
from pydantic.main import ModelMetaclass
from metagpt.logs import logger
# from metagpt.utils.serialize import actionoutout_schema_to_mapping
# from metagpt.actions.action_output import ActionOutput
# from metagpt.actions.action import Action
from metagpt.utils.serialize import actionoutout_schema_to_mapping, actionoutput_mapping_to_str, \
actionoutput_str_to_mapping
from metagpt.utils.utils import import_class
class RawMessage(TypedDict):
@ -24,16 +23,45 @@ class RawMessage(TypedDict):
role: str
@dataclass
class Message:
"""list[<role>: <content>]"""
content: str
instruct_content: BaseModel = field(default=None)
role: str = field(default='user') # system / user / assistant
cause_by: Type["Action"] = field(default="")
sent_from: str = field(default="")
send_to: str = field(default="")
restricted_to: str = field(default="")
class Message(BaseModel):
content: str = ""
instruct_content: BaseModel = Field(default=None)
role: str = "user" # system / user / assistant
cause_by: Type["Action"] = Field(default=None)
sent_from: str = ""
send_to: str = ""
restricted_to: str = ""
def __init__(self, **kwargs):
instruct_content = kwargs.get("instruct_content", None)
cause_by = kwargs.get("cause_by", None)
if instruct_content and not isinstance(instruct_content, BaseModel):
ic = instruct_content
mapping = actionoutput_str_to_mapping(ic["mapping"])
actionoutput_class = import_class("ActionOutput", "metagpt.actions.action_output")
ic_obj = actionoutput_class.create_model_class(class_name=ic["class"], mapping=mapping)
ic_new = ic_obj(**ic["value"])
kwargs["instruct_content"] = ic_new
if cause_by and not isinstance(cause_by, ModelMetaclass):
action_class = import_class("Action", "metagpt.actions.action")
kwargs["cause_by"] = action_class.deser_class(cause_by)
super(Message, self).__init__(**kwargs)
def dict(self, *args, **kwargs) -> "DictStrAny":
""" overwrite the `dict` to dump dynamic pydantic model"""
obj_dict = super(Message, self).dict(*args, **kwargs)
ic = self.instruct_content # deal custom-defined action
if ic:
schema = ic.schema()
mapping = actionoutout_schema_to_mapping(schema)
mapping = actionoutput_mapping_to_str(mapping)
obj_dict["instruct_content"] = {"class": schema["title"], "mapping": mapping, "value": ic.dict()}
cb = self.cause_by
if cb:
obj_dict["cause_by"] = cb.ser_class()
return obj_dict
def __str__(self):
# prefix = '-'.join([self.role, str(self.cause_by)])
@ -42,46 +70,6 @@ class Message:
def __repr__(self):
return self.__str__()
# def serialize(self):
# message_cp: Message = copy.deepcopy(self)
# ic = message_cp.instruct_content
# if ic:
# # model create by pydantic create_model like `pydantic.main.prd`, can't pickle.dump directly
# schema = ic.schema()
# mapping = actionoutout_schema_to_mapping(schema)
#
# message_cp.instruct_content = {"class": schema["title"], "mapping": mapping, "value": ic.dict()}
# cb = message_cp.cause_by
# if cb:
# message_cp.cause_by = cb.serialize()
#
# return message_cp.dict()
#
# @classmethod
# def deserialize(cls, message_dict: dict):
# instruct_content = message_dict.get("instruct_content")
# if instruct_content:
# ic = instruct_content
# ic_obj = ActionOutput.create_model_class(class_name=ic["class"], mapping=ic["mapping"])
# ic_new = ic_obj(**ic["value"])
# message_dict.instruct_content = ic_new
# cause_by = message_dict.get("cause_by")
# if cause_by:
# message_dict.cause_by = Action.deserialize(cause_by)
#
# return Message(**message_dict)
def dict(self):
return {
"content": self.content,
"instruct_content": self.instruct_content,
"role": self.role,
"cause_by": self.cause_by,
"sent_from": self.sent_from,
"send_to": self.send_to,
"restricted_to": self.restricted_to
}
def to_dict(self) -> dict:
return {
"role": self.role,

View file

@ -15,7 +15,8 @@ from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.utils.common import NoMoneyException
from metagpt.utils.utils import read_json_file, write_json_file
from metagpt.utils.utils import read_json_file, write_json_file, serialize_decorator
from metagpt.const import SERDESER_PATH
class Team(BaseModel):
@ -30,29 +31,35 @@ class Team(BaseModel):
class Config:
arbitrary_types_allowed = True
def serialize(self, stg_path: Path):
def serialize(self, stg_path: Path = None):
stg_path = SERDESER_PATH.joinpath("team") if stg_path is None else stg_path
team_info_path = stg_path.joinpath("team_info.json")
write_json_file(team_info_path, {
"idea": self.idea,
"investment": self.investment
})
write_json_file(team_info_path, self.dict(exclude={"environment": True}))
self.environment.serialize(stg_path.joinpath("environment"))
self.environment.serialize(stg_path.joinpath("environment")) # save environment alone
def deserialize(self, stg_path: Path):
@classmethod
def recover(cls, stg_path: Path) -> "Team":
return cls.deserialize(stg_path)
@classmethod
def deserialize(cls, stg_path: Path) -> "Team":
""" stg_path = ./storage/team """
# recover team_info
team_info_path = stg_path.joinpath("team_info.json")
if not team_info_path.exists():
logger.error("recover storage not exist, not to recover and continue run the old project.")
team_info = read_json_file(team_info_path)
self.investment = team_info.get("investment", 10.0)
self.idea = team_info.get("idea", "")
raise FileNotFoundError("recover storage meta file `team_info.json` not exist, "
"not to recover and please start a new project.")
team_info: dict = read_json_file(team_info_path)
# recover environment
environment_path = stg_path.joinpath("environment")
self.environment = Environment()
self.environment.deserialize(stg_path=environment_path)
environment = Environment.deserialize(stg_path=stg_path.joinpath("environment"))
team_info.update({"environment": environment})
team = Team(**team_info)
return team
def hire(self, roles: list[Role]):
"""Hire roles to cooperate"""
@ -76,6 +83,7 @@ class Team(BaseModel):
def _save(self):
logger.info(self.json())
@serialize_decorator
async def run(self, n_round=3):
"""Run company until target round or no money"""
while n_round > 0:
@ -85,4 +93,3 @@ class Team(BaseModel):
self._check_balance()
await self.environment.run()
return self.environment.history

View file

@ -5,9 +5,7 @@
import copy
import pickle
from metagpt.actions.action_output import ActionOutput
from metagpt.schema import Message
from metagpt.actions.action import Action
from metagpt.utils.utils import import_class
def actionoutout_schema_to_mapping(schema: dict) -> dict:
@ -59,7 +57,7 @@ def actionoutput_str_to_mapping(mapping: dict) -> dict:
return new_mapping
def serialize_general_message(message: Message) -> dict:
def serialize_general_message(message: "Message") -> dict:
""" serialize Message, not to save"""
message_cp = copy.deepcopy(message)
ic = message_cp.instruct_content
@ -76,7 +74,7 @@ def serialize_general_message(message: Message) -> dict:
return message_cp.dict()
def serialize_message(message: Message):
def serialize_message(message: "Message"):
message_cp = copy.deepcopy(message) # avoid `instruct_content` value update by reference
ic = message_cp.instruct_content
if ic:
@ -90,29 +88,35 @@ def serialize_message(message: Message):
return msg_ser
def deserialize_general_message(message_dict: dict) -> Message:
def deserialize_general_message(message_dict: dict) -> "Message":
""" deserialize Message, not to load"""
instruct_content = message_dict.pop("instruct_content")
cause_by = message_dict.pop("cause_by")
message = Message(**message_dict)
message_cls = import_class("Message", "metagpt.schema")
message = message_cls(**message_dict)
if instruct_content:
ic = instruct_content
mapping = actionoutput_str_to_mapping(ic["mapping"])
ic_obj = ActionOutput.create_model_class(class_name=ic["class"], mapping=mapping)
actionoutput_class = import_class("ActionOutput", "metagpt.actions.action_output")
ic_obj = actionoutput_class.create_model_class(class_name=ic["class"], mapping=mapping)
ic_new = ic_obj(**ic["value"])
message.instruct_content = ic_new
if cause_by:
message.cause_by = Action.deser_class(cause_by)
action_class = import_class("Action", "metagpt.actions.action")
message.cause_by = action_class.deser_class(cause_by)
return message
def deserialize_message(message_ser: str) -> Message:
def deserialize_message(message_ser: str) -> "Message":
message = pickle.loads(message_ser)
if message.instruct_content:
ic = message.instruct_content
ic_obj = ActionOutput.create_model_class(class_name=ic["class"], mapping=ic["mapping"])
actionoutput_class = import_class("ActionOutput", "metagpt.actions.action_output")
ic_obj = actionoutput_class.create_model_class(class_name=ic["class"], mapping=ic["mapping"])
ic_new = ic_obj(**ic["value"])
message.instruct_content = ic_new

View file

@ -6,6 +6,9 @@ from typing import Any
import json
from pathlib import Path
import importlib
import traceback
from metagpt.logs import logger
def read_json_file(json_file: str, encoding=None) -> list[Any]:
@ -39,3 +42,46 @@ def import_class_inst(class_name: str, module_name: str, *args, **kwargs) -> obj
a_class = import_class(class_name, module_name)
class_inst = a_class(*args, **kwargs)
return class_inst
def format_trackback_info(limit: int = 2):
return traceback.format_exc(limit=limit)
def serialize_decorator(func):
async def wrapper(self, *args, **kwargs):
try:
result = await func(self, *args, **kwargs)
self.serialize() # Team.serialize
return result
except KeyboardInterrupt as kbi:
logger.error(f"KeyboardInterrupt occurs, start to serialize the project, exp:\n{format_trackback_info()}")
self.serialize() # Team.serialize
except Exception as exp:
logger.error(f"Exception occurs, start to serialize the project, exp:\n{format_trackback_info()}")
self.serialize() # Team.serialize
return wrapper
def role_raise_decorator(func):
async def wrapper(self, *args, **kwargs):
try:
return await func(self, *args, **kwargs)
except KeyboardInterrupt as kbi:
logger.error(f"KeyboardInterrupt: {kbi} occurs, start to serialize the project")
if self._rc.env:
newest_msgs = self._rc.env.memory.get(1)
if len(newest_msgs) > 0:
self._rc.memory.delete(newest_msgs[0])
raise Exception(format_trackback_info(limit=None)) # raise again to make it captured outside
except Exception as exp:
if self._rc.env:
newest_msgs = self._rc.env.memory.get(1)
if len(newest_msgs) > 0:
logger.warning("There is a exception in role's execution, in order to resume, "
"we delete the newest role communication message in the role's memory.")
self._rc.memory.delete(newest_msgs[0]) # remove newest msg of the role to make it observed again
raise Exception(format_trackback_info(limit=None)) # raise again to make it captured outside
return wrapper

View file

@ -1,10 +1,11 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from typing import Optional
import asyncio
import fire
from pathlib import Path
from metagpt.const import SERDES_PATH
from metagpt.roles import (
Architect,
Engineer,
@ -22,11 +23,11 @@ async def startup(
code_review: bool = False,
run_tests: bool = False,
implement: bool = True,
recover_path: bool = False,
recover_path: Optional[str] = None,
):
"""Run a startup. Be a boss."""
company = Team()
if not recover_path:
company = Team()
company.hire(
[
ProductManager(),
@ -45,8 +46,12 @@ async def startup(
# (bug fixing capability comes soon!)
company.hire([QaEngineer()])
else:
stg_path = SERDES_PATH.joinpath("team")
company.deserialize(stg_path=stg_path)
# # stg_path = SERDESER_PATH.joinpath("team")
stg_path = Path(recover_path)
if not stg_path.exists() or not str(stg_path).endswith("team"):
raise FileNotFoundError(f"{recover_path} not exists or not endswith `team`")
company = Team.recover(stg_path=stg_path)
idea = company.idea # use original idea
company.invest(investment)

View file

@ -11,20 +11,3 @@ from metagpt.actions import Action, WritePRD, WriteTest
def test_action_repr():
actions = [Action(), WriteTest(), WritePRD()]
assert "WriteTest" in str(actions)
def test_action_serdes():
action_info = WriteTest.ser_class()
assert action_info["action_class"] == "WriteTest"
action_class = Action.deser_class(action_info)
assert action_class == WriteTest
def test_action_class_serdes():
name = "write test"
action_info = WriteTest(name=name).serialize()
assert action_info["name"] == name
action = Action.deserialize(action_info)
assert action.name == name

View file

@ -2,84 +2,10 @@
# -*- coding: utf-8 -*-
# @Desc : unittest of Role
from pathlib import Path
import shutil
import pytest
from metagpt.roles.role import Role, RoleReactMode
from metagpt.actions.action import Action
from metagpt.schema import Message
from metagpt.actions.add_requirement import BossRequirement
from metagpt.roles.product_manager import ProductManager
serdes_path = Path(__file__).absolute().parent.joinpath("../../data/serdes_storage")
from metagpt.roles.role import Role
def test_role_serdes():
stg_path_prefix = serdes_path.joinpath("team/environment/roles/")
shutil.rmtree(serdes_path.joinpath("team"), ignore_errors=True)
pm = ProductManager()
role_tag = f"{pm.__class__.__name__}_{pm.name}"
stg_path = stg_path_prefix.joinpath(role_tag)
pm.serialize(stg_path)
assert stg_path.joinpath("actions/actions_info.json").exists()
new_pm = Role.deserialize(stg_path)
assert new_pm.name == pm.name
assert len(new_pm.get_memories(1)) == 0
class ActionOK(Action):
async def run(self, messages: list["Message"]):
return "ok"
class ActionRaise(Action):
async def run(self, messages: list["Message"]):
raise RuntimeError("parse error")
class RoleA(Role):
def __init__(self,
name: str = "RoleA",
profile: str = "Role A",
goal: str = "",
constraints: str = ""):
super(RoleA, self).__init__(name=name, profile=profile, goal=goal, constraints=constraints)
self._init_actions([ActionOK, ActionRaise])
self._watch([BossRequirement])
self._rc.react_mode = RoleReactMode.BY_ORDER
async def run(self, message: "Message" = None, stg_path: str = None):
try:
await super(RoleA, self).run(message)
except Exception as exp:
print("exp ", exp)
self.serialize(stg_path)
@pytest.mark.asyncio
async def test_role_serdes_interrupt():
role_a = RoleA()
shutil.rmtree(serdes_path.joinpath("team"), ignore_errors=True)
stg_path = serdes_path.joinpath(f"team/environment/roles/{role_a.__class__.__name__}_{role_a.name}")
await role_a.run(
message=Message(content="demo", cause_by=BossRequirement),
stg_path=stg_path
)
assert role_a._rc.memory.count() == 2
assert stg_path.joinpath("actions/todo.json").exists()
new_role_a: Role = Role.deserialize(stg_path)
assert new_role_a._rc.state == 1
await role_a.run(
message=Message(content="demo", cause_by=BossRequirement),
stg_path=stg_path
)
def test_role_desc():
role = Role(profile="Sales", desc="Best Seller")
assert role.profile == "Sales"
assert role._setting.desc == "Best Seller"

View file

@ -4,23 +4,33 @@
# @Desc :
import pytest
from metagpt.actions import Action
from metagpt.actions import Action, WritePRD, WriteTest
from metagpt.llm import LLM
from metagpt.provider.openai_api import OpenAIGPTAPI
def test_action_serialize():
action = Action()
ser_action_dict = action.dict()
assert "name" in ser_action_dict
assert "llm" in ser_action_dict
# assert "llm" not in ser_action_dict # not export
@pytest.mark.asyncio
async def test_action_deserialize():
action = Action()
serialized_data = action.dict()
new_action = Action(**serialized_data)
assert new_action.name == ""
assert new_action.llm == LLM()
assert len(await new_action._aask("who are you")) > 0
def test_action_serdeser():
action_info = WriteTest.ser_class()
assert action_info["action_class"] == "WriteTest"
action_class = Action.deser_class(action_info)
assert action_class == WriteTest

View file

@ -25,4 +25,4 @@ async def test_architect_deserialize():
assert new_role.name == "Bob"
assert len(new_role._actions) == 1
assert isinstance(new_role._actions[0], Action)
await new_role._actions[0].run(context="write a cli snake game")
await new_role._actions[0].run(context="write a cli snake game")

View file

@ -0,0 +1,91 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :
from pathlib import Path
import shutil
from metagpt.schema import Message
from metagpt.actions.action_output import ActionOutput
from metagpt.roles.project_manager import ProjectManager
from metagpt.actions.add_requirement import BossRequirement
from metagpt.actions.project_management import WriteTasks
from metagpt.environment import Environment
from tests.metagpt.serialize_deserialize.test_serdeser_base import RoleC, ActionOK, serdeser_path
def test_env_serialize():
env = Environment()
ser_env_dict = env.dict()
assert "roles" in ser_env_dict
assert "memory" in ser_env_dict
def test_env_deserialize():
env = Environment()
env.publish_message(message=Message(content="test env serialize"))
ser_env_dict = env.dict()
new_env = Environment(**ser_env_dict)
assert len(new_env.roles) == 0
assert new_env.memory.storage[0].content == "test env serialize"
assert len(new_env.history) == 25
def test_environment_serdeser():
out_mapping = {"field1": (list[str], ...)}
out_data = {"field1": ["field1 value1", "field1 value2"]}
ic_obj = ActionOutput.create_model_class("prd", out_mapping)
message = Message(
content="prd",
instruct_content=ic_obj(**out_data),
role="product manager",
cause_by=BossRequirement
)
environment = Environment()
role_c = RoleC()
environment.add_role(role_c)
environment.publish_message(message)
ser_data = environment.dict()
assert ser_data["roles"]["Role C"]["name"] == "RoleC"
new_env: Environment = Environment(**ser_data)
assert len(new_env.roles) == 1
assert new_env.memory.count() == 1
assert new_env.memory.storage[0].instruct_content == ic_obj(**out_data)
assert list(new_env.roles.values())[0]._states == list(environment.roles.values())[0]._states
assert list(new_env.roles.values())[0]._actions == list(environment.roles.values())[0]._actions
assert isinstance(list(environment.roles.values())[0]._actions[0], ActionOK)
assert type(list(new_env.roles.values())[0]._actions[0]) == ActionOK
def test_environment_serdeser_v2():
environment = Environment()
pm = ProjectManager()
environment.add_role(pm)
ser_data = environment.dict()
new_env: Environment = Environment(**ser_data)
role = new_env.get_role(pm.profile)
assert isinstance(role, ProjectManager)
assert isinstance(role._actions[0], WriteTasks)
assert isinstance(list(new_env.roles.values())[0]._actions[0], WriteTasks)
def test_environment_serdeser_save():
environment = Environment()
role_c = RoleC()
shutil.rmtree(serdeser_path.joinpath("team"), ignore_errors=True)
stg_path = serdeser_path.joinpath("team/environment")
environment.add_role(role_c)
environment.serialize(stg_path)
new_env: Environment = Environment.deserialize(stg_path)
assert len(new_env.roles) == 1
assert type(list(new_env.roles.values())[0]._actions[0]) == ActionOK

View file

@ -3,6 +3,7 @@
# @Desc : unittest of memory
from pathlib import Path
from pydantic import BaseModel
from metagpt.schema import Message
from metagpt.memory.memory import Memory
@ -10,10 +11,36 @@ from metagpt.actions.action_output import ActionOutput
from metagpt.actions.design_api import WriteDesign
from metagpt.actions.add_requirement import BossRequirement
serdes_path = Path(__file__).absolute().parent.joinpath("../../data/serdes_storage")
from tests.metagpt.serialize_deserialize.test_serdeser_base import serdeser_path
def test_memory_serdes():
def test_memory_serdeser():
msg1 = Message(role="Boss",
content="write a snake game",
cause_by=BossRequirement)
out_mapping = {"field2": (list[str], ...)}
out_data = {"field2": ["field2 value1", "field2 value2"]}
ic_obj = ActionOutput.create_model_class("system_design", out_mapping)
msg2 = Message(role="Architect",
instruct_content=ic_obj(**out_data),
content="system design content",
cause_by=WriteDesign)
memory = Memory()
memory.add_batch([msg1, msg2])
ser_data = memory.dict()
new_memory = Memory(**ser_data)
assert new_memory.count() == 2
new_msg2 = new_memory.get(2)[0]
assert isinstance(new_msg2, BaseModel)
assert isinstance(new_memory.storage[-1], BaseModel)
assert new_memory.storage[-1].cause_by == WriteDesign
assert new_msg2.role == "Boss"
def test_memory_serdeser_save():
msg1 = Message(role="User",
content="write a 2048 game",
cause_by=BossRequirement)
@ -29,7 +56,7 @@ def test_memory_serdes():
memory = Memory()
memory.add_batch([msg1, msg2])
stg_path = serdes_path.joinpath("team/environment")
stg_path = serdeser_path.joinpath("team/environment")
memory.serialize(stg_path)
assert stg_path.joinpath("memory.json").exists()
@ -38,5 +65,6 @@ def test_memory_serdes():
new_msg2 = new_memory.get(1)[0]
assert new_msg2.instruct_content.field1 == ["field1 value1", "field1 value2"]
assert new_msg2.cause_by == WriteDesign
assert len(new_memory.index) == 2
stg_path.joinpath("memory.json").unlink()

View file

@ -14,9 +14,8 @@ async def test_product_manager_deserialize():
role = ProductManager()
ser_role_dict = role.dict(by_alias=True)
new_role = ProductManager(**ser_role_dict)
# new_role = ProductManager().deserialize(ser_role_dict)
assert new_role.name == "Alice"
assert len(new_role._actions) == 1
assert isinstance(new_role._actions[0], Action)
await new_role._actions[0].run([Message(content="write a cli snake game")])
await new_role._actions[0].run([Message(content="write a cli snake game")])

View file

@ -6,6 +6,7 @@ import pytest
from metagpt.roles.project_manager import ProjectManager
from metagpt.actions.action import Action
from metagpt.actions.project_management import WriteTasks
def test_project_manager_serialize():
@ -20,9 +21,10 @@ def test_project_manager_serialize():
async def test_project_manager_deserialize():
role = ProjectManager()
ser_role_dict = role.dict(by_alias=True)
new_role = ProjectManager(**ser_role_dict)
# new_role = ProjectManager().deserialize(ser_role_dict)
assert new_role.name == "Eve"
assert len(new_role._actions) == 1
assert isinstance(new_role._actions[0], Action)
await new_role._actions[0].run(context="write a cli snake game")
assert isinstance(new_role._actions[0], WriteTasks)
# await new_role._actions[0].run(context="write a cli snake game")

View file

@ -2,12 +2,30 @@
# @Date : 11/23/2023 4:49 PM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
from pathlib import Path
import shutil
import pytest
from metagpt.logs import logger
from metagpt.roles.role import Role
from metagpt.actions import WriteCode, WriteCodeReview
from metagpt.schema import Message
from metagpt.actions.add_requirement import BossRequirement
from metagpt.roles.product_manager import ProductManager
from metagpt.const import SERDESER_PATH
from metagpt.roles.engineer import Engineer
from metagpt.utils.utils import format_trackback_info
from metagpt.actions.action import Action
from tests.metagpt.serialize_deserialize.test_serdeser_base import RoleA, RoleB, RoleC, serdeser_path
def test_roles():
role_a = RoleA()
assert len(role_a._rc.watch) == 1
role_b = RoleB()
assert len(role_a._rc.watch) == 1
assert len(role_b._rc.watch) == 1
def test_role_serialize():
@ -30,12 +48,50 @@ def test_engineer_serialize():
async def test_engineer_deserialize():
role = Engineer(use_code_review=True)
ser_role_dict = role.dict(by_alias=True)
# new_role = Engineer().deserialize(ser_role_dict)
# also can be deserialized in this way:
new_role = Engineer(**ser_role_dict)
assert new_role.name == "Alex"
assert new_role.use_code_review is True
assert len(new_role._actions) == 2
assert isinstance(new_role._actions[0], Action)
assert isinstance(new_role._actions[1], Action)
await new_role._actions[0].run(context="write a cli snake game", filename="test_code")
assert isinstance(new_role._actions[0], WriteCode)
assert isinstance(new_role._actions[1], WriteCodeReview)
# await new_role._actions[0].run(context="write a cli snake game", filename="test_code")
def test_role_serdeser_save():
stg_path_prefix = serdeser_path.joinpath("team/environment/roles/")
shutil.rmtree(serdeser_path.joinpath("team"), ignore_errors=True)
pm = ProductManager()
role_tag = f"{pm.__class__.__name__}_{pm.name}"
stg_path = stg_path_prefix.joinpath(role_tag)
pm.serialize(stg_path)
new_pm = Role.deserialize(stg_path)
assert new_pm.name == pm.name
assert len(new_pm.get_memories(1)) == 0
@pytest.mark.asyncio
async def test_role_serdeser_interrupt():
role_c = RoleC()
shutil.rmtree(SERDESER_PATH.joinpath("team"), ignore_errors=True)
stg_path = SERDESER_PATH.joinpath(f"team/environment/roles/{role_c.__class__.__name__}_{role_c.name}")
try:
await role_c.run(
message=Message(content="demo", cause_by=BossRequirement)
)
except Exception as exp:
logger.error(f"Exception in `role_a.run`, detail: {format_trackback_info()}")
role_c.serialize(stg_path)
assert role_c._rc.memory.count() == 2
new_role_a: Role = Role.deserialize(stg_path)
assert new_role_a._rc.state == 1
with pytest.raises(Exception):
await role_c.run(
message=Message(content="demo", cause_by=BossRequirement)
)

View file

@ -0,0 +1,49 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : unittest of schema ser&deser
from metagpt.schema import Message
from metagpt.actions.action_output import ActionOutput
from metagpt.actions.write_code import WriteCode
from tests.metagpt.serialize_deserialize.test_serdeser_base import MockMessage
def test_message_serdeser():
out_mapping = {"field3": (str, ...), "field4": (list[str], ...)}
out_data = {"field3": "field3 value3", "field4": ["field4 value1", "field4 value2"]}
ic_obj = ActionOutput.create_model_class("code", out_mapping)
message = Message(
content="code",
instruct_content=ic_obj(**out_data),
role="engineer",
cause_by=WriteCode
)
ser_data = message.dict()
assert ser_data["cause_by"] == {
"action_class": "WriteCode",
"module_name": "metagpt.actions.write_code"
}
assert ser_data["instruct_content"]["class"] == "code"
new_message = Message(**ser_data)
assert new_message.cause_by == WriteCode
assert new_message.cause_by in [WriteCode]
assert new_message.instruct_content == ic_obj(**out_data)
def test_message_without_postprocess():
""" to explain `instruct_content` should be postprocessed """
out_mapping = {"field1": (list[str], ...)}
out_data = {"field1": ["field1 value1", "field1 value2"]}
ic_obj = ActionOutput.create_model_class("code", out_mapping)
message = MockMessage(
content="code",
instruct_content=ic_obj(**out_data)
)
ser_data = message.dict()
assert ser_data["instruct_content"] == {"field1": ["field1 value1", "field1 value2"]}
new_message = MockMessage(**ser_data)
assert new_message.instruct_content != ic_obj(**out_data)

View file

@ -0,0 +1,89 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : base test actions / roles used in unittest
from pydantic import BaseModel, Field
from pathlib import Path
import asyncio
from metagpt.actions.action import Action
from metagpt.roles.role import Role, RoleReactMode
from metagpt.actions.add_requirement import BossRequirement
from metagpt.actions.action_output import ActionOutput
serdeser_path = Path(__file__).absolute().parent.joinpath("../../data/serdeser_storage")
class MockMessage(BaseModel):
""" to test normal dict without postprocess """
content: str = ""
instruct_content: BaseModel = Field(default=None)
class ActionPass(Action):
name: str = Field(default="ActionPass")
async def run(self, messages: list["Message"]) -> ActionOutput:
await asyncio.sleep(5) # sleep to make other roles can watch the executed Message
output_mapping = {
"result": (str, ...)
}
pass_class = ActionOutput.create_model_class("pass", output_mapping)
pass_output = ActionOutput("ActionPass run passed", pass_class(**{"result": "pass result"}))
return pass_output
class ActionOK(Action):
name: str = Field(default="ActionOK")
async def run(self, messages: list["Message"]) -> str:
await asyncio.sleep(5)
return "ok"
class ActionRaise(Action):
name: str = Field(default="ActionRaise")
async def run(self, messages: list["Message"]) -> str:
raise RuntimeError("parse error in ActionRaise")
class RoleA(Role):
name: str = Field(default="RoleA")
profile: str = Field(default="Role A")
goal: str = "RoleA's goal"
constraints: str = "RoleA's constraints"
def __init__(self, **kwargs):
super(RoleA, self).__init__(**kwargs)
self._init_actions([ActionPass])
self._watch([BossRequirement])
class RoleB(Role):
name: str = Field(default="RoleB")
profile: str = Field(default="Role B")
goal: str = "RoleB's goal"
constraints: str = "RoleB's constraints"
def __init__(self, **kwargs):
super(RoleB, self).__init__(**kwargs)
self._init_actions([ActionOK, ActionRaise])
self._watch([ActionPass])
self._rc.react_mode = RoleReactMode.BY_ORDER
class RoleC(Role):
name: str = Field(default="RoleC")
profile: str = Field(default="Role C")
goal: str = "RoleC's goal"
constraints: str = "RoleC's constraints"
def __init__(self, **kwargs):
super(RoleC, self).__init__(**kwargs)
self._init_actions([ActionOK, ActionRaise])
self._watch([BossRequirement])
self._rc.react_mode = RoleReactMode.BY_ORDER

View file

@ -2,46 +2,120 @@
# @Date : 11/27/2023 10:07 AM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
from pathlib import Path
import shutil
import pytest
from metagpt.environment import Environment
from metagpt.schema import Message
from metagpt.software_company import SoftwareCompany
from metagpt.roles import ProjectManager, ProductManager, Architect
from metagpt.team import Team
from metagpt.const import SERDESER_PATH
from tests.metagpt.serialize_deserialize.test_serdeser_base import RoleA, RoleB, RoleC, serdeser_path, ActionOK
def test_env_serialize():
env = Environment()
ser_env_dict = env.dict()
assert "roles" in ser_env_dict
assert "memory" in ser_env_dict
assert "memory" in ser_env_dict
def test_team_deserialize():
company = Team()
def test_env_deserialize():
env = Environment()
env.publish_message(message=Message(content="test env serialize"))
ser_env_dict = env.dict()
new_env = Environment(**ser_env_dict)
assert len(new_env.roles) == 0
assert new_env.memory.storage[0].content == "test env serialize"
assert len(new_env.history) == 25
def test_softwarecompany_deserialize():
team = SoftwareCompany()
team.hire(
pm = ProductManager()
arch = Architect()
company.hire(
[
ProductManager(),
Architect(),
pm,
arch,
ProjectManager(),
]
)
assert len(team.environment.get_roles()) == 3
ser_team_dict = team.dict()
new_team = SoftwareCompany(**ser_team_dict)
assert len(new_team.environment.get_roles()) == 3
assert new_team.environment.get_role('Product Manager') is not None
assert new_team.environment.get_role('Product Manager') is not None
assert new_team.environment.get_role('Architect') is not None
assert len(company.environment.get_roles()) == 3
ser_company = company.dict()
new_company = Team(**ser_company)
assert len(new_company.environment.get_roles()) == 3
assert new_company.environment.get_role(pm.profile) is not None
new_pm = new_company.environment.get_role(pm.profile)
assert type(new_pm) == ProductManager
assert new_company.environment.get_role(pm.profile) is not None
assert new_company.environment.get_role(arch.profile) is not None
def test_team_serdeser_save():
company = Team()
company.hire([RoleC()])
stg_path = serdeser_path.joinpath("team")
shutil.rmtree(stg_path, ignore_errors=True)
company.serialize(stg_path=stg_path)
new_company = Team.deserialize(stg_path)
assert len(new_company.environment.roles) == 1
@pytest.mark.asyncio
async def test_team_recover():
idea = "write a snake game"
stg_path = SERDESER_PATH.joinpath("team")
shutil.rmtree(stg_path, ignore_errors=True)
company = Team()
role_c = RoleC()
company.hire([role_c])
company.start_project(idea)
await company.run(n_round=4)
ser_data = company.dict()
new_company = Team(**ser_data)
new_role_c = new_company.environment.get_role(role_c.profile)
assert new_role_c._rc.memory == role_c._rc.memory
assert new_role_c._rc.env == role_c._rc.env
assert new_role_c._rc.env.memory == role_c._rc.env.memory
assert new_company.environment.memory.count() == 1
assert type(list(new_company.environment.roles.values())[0]._actions[0]) == ActionOK
new_company.start_project(idea)
await new_company.run(n_round=4)
@pytest.mark.asyncio
async def test_team_recover_save():
idea = "write a 2048 web game"
stg_path = SERDESER_PATH.joinpath("team")
shutil.rmtree(stg_path, ignore_errors=True)
company = Team()
role_c = RoleC()
company.hire([role_c])
company.start_project(idea)
await company.run(n_round=4)
new_company = Team.recover(stg_path)
new_role_c = new_company.environment.get_role(role_c.profile)
assert new_role_c._rc.memory == role_c._rc.memory
assert new_role_c._rc.env != role_c._rc.env
assert new_role_c.recovered != role_c.recovered # here cause previous ut is `!=`
assert new_role_c._rc.todo != role_c._rc.todo # serialize exclude `_rc.todo`
assert new_role_c._rc.news != role_c._rc.news # serialize exclude `_rc.news`
assert new_role_c._rc.env.memory == role_c._rc.env.memory
new_company.start_project(idea)
await new_company.run(n_round=4)
@pytest.mark.asyncio
async def test_team_recover_multi_roles_save():
idea = "write a snake game"
stg_path = SERDESER_PATH.joinpath("team")
shutil.rmtree(stg_path, ignore_errors=True)
company = Team()
company.hire([RoleA(), RoleB()])
company.start_project(idea)
await company.run(n_round=4)
new_company = Team.recover(stg_path)
new_company.start_project(idea)
await new_company.run(n_round=4)

View file

@ -13,7 +13,7 @@ def test_action_serialize():
action = WritePRD()
ser_action_dict = action.dict()
assert "name" in ser_action_dict
assert "llm" in ser_action_dict
# assert "llm" in ser_action_dict # not export
@pytest.mark.asyncio
@ -21,8 +21,7 @@ async def test_action_deserialize():
action = WritePRD()
serialized_data = action.dict()
new_action = WritePRD(**serialized_data)
# new_action = WritePRD().deserialize(serialized_data)
assert new_action.name == ""
assert new_action.llm == LLM()
assert len(await new_action.run([Message(content="write a cli snake game")])) > 0
action_output = await new_action.run([Message(content="write a cli snake game")])
assert len(action_output.content) > 0

View file

@ -12,14 +12,14 @@ def test_write_design_serialize():
action = WriteCode()
ser_action_dict = action.dict()
assert ser_action_dict["name"] == "WriteCode"
assert "llm" in ser_action_dict
# assert "llm" in ser_action_dict # not export
def test_write_task_serialize():
action = WriteCodeReview()
ser_action_dict = action.dict()
assert ser_action_dict["name"] == "WriteCodeReview"
assert "llm" in ser_action_dict
# assert "llm" in ser_action_dict # not export
@pytest.mark.asyncio
@ -27,7 +27,6 @@ async def test_write_code_deserialize():
action = WriteCode()
serialized_data = action.dict()
new_action = WriteCode(**serialized_data)
# new_action = WriteCode().deserialize(serialized_data)
assert new_action.name == "WriteCode"
assert new_action.llm == LLM()
await new_action.run(context="write a cli snake game", filename="test_code")
@ -38,9 +37,8 @@ async def test_write_code_review_deserialize():
action = WriteCodeReview()
serialized_data = action.dict()
new_action = WriteCodeReview(**serialized_data)
# new_action = WriteCodeReview().deserialize(serialized_data)
code = await WriteCode().run(context="write a cli snake game", filename="test_code")
assert new_action.name == "WriteCodeReview"
assert new_action.llm == LLM()
await new_action.run(context="write a cli snake game", code =code, filename="test_rewrite_code")
await new_action.run(context="write a cli snake game", code=code, filename="test_rewrite_code")

View file

@ -12,21 +12,21 @@ def test_write_design_serialize():
action = WriteDesign()
ser_action_dict = action.dict()
assert "name" in ser_action_dict
assert "llm" in ser_action_dict
# assert "llm" in ser_action_dict # not export
def test_write_task_serialize():
action = WriteTasks()
ser_action_dict = action.dict()
assert "name" in ser_action_dict
assert "llm" in ser_action_dict
# assert "llm" in ser_action_dict # not export
@pytest.mark.asyncio
async def test_write_design_deserialize():
action = WriteDesign()
serialized_data = action.dict()
new_action = WriteDesign().deserialize(serialized_data)
new_action = WriteDesign(**serialized_data)
assert new_action.name == ""
assert new_action.llm == LLM()
await new_action.run(context="write a cli snake game")
@ -37,7 +37,6 @@ async def test_write_task_deserialize():
action = WriteTasks()
serialized_data = action.dict()
new_action = WriteTasks(**serialized_data)
# new_action = WriteTasks().deserialize(serialized_data)
assert new_action.name == "CreateTasks"
assert new_action.llm == LLM()
await new_action.run(context="write a cli snake game")

View file

@ -8,17 +8,15 @@
import pytest
from pathlib import Path
import shutil
from metagpt.actions import BossRequirement
from metagpt.environment import Environment
from metagpt.logs import logger
from metagpt.roles import Architect, ProductManager, Role
from metagpt.schema import Message
from tests.metagpt.roles.test_role import RoleA
serdes_path = Path(__file__).absolute().parent.joinpath("../data/serdes_storage")
serdeser_path = Path(__file__).absolute().parent.joinpath("../data/serdeser_storage")
@pytest.fixture
@ -27,14 +25,23 @@ def env():
def test_add_role(env: Environment):
role = ProductManager("Alice", "product manager", "create a new product", "limited resources")
role = ProductManager(name="Alice",
profile="product manager",
goal="create a new product",
constraints="limited resources")
env.add_role(role)
assert env.get_role(role.profile) == role
def test_get_roles(env: Environment):
role1 = Role("Alice", "product manager", "create a new product", "limited resources")
role2 = Role("Bob", "engineer", "develop the new product", "short deadline")
role1 = Role(name="Alice",
profile="product manager",
goal="create a new product",
constraints="limited resources")
role2 = Role(name="Bob",
profile="engineer",
goal="develop the new product",
constraints="short deadline")
env.add_role(role1)
env.add_role(role2)
roles = env.get_roles()
@ -43,8 +50,14 @@ def test_get_roles(env: Environment):
@pytest.mark.asyncio
async def test_publish_and_process_message(env: Environment):
product_manager = ProductManager("Alice", "Product Manager", "做AI Native产品", "资源有限")
architect = Architect("Bob", "Architect", "设计一个可用、高效、较低成本的系统,包括数据结构与接口", "资源有限,需要节省成本")
product_manager = ProductManager(name="Alice",
profile="Product Manager",
goal="做AI Native产品",
constraints="资源有限")
architect = Architect(name="Bob",
profile="Architect",
goal="设计一个可用、高效、较低成本的系统,包括数据结构与接口",
constraints="资源有限,需要节省成本")
env.add_roles([product_manager, architect])
env.publish_message(Message(role="BOSS", content="需要一个基于LLM做总结的搜索引擎", cause_by=BossRequirement))
@ -52,18 +65,3 @@ async def test_publish_and_process_message(env: Environment):
await env.run(k=2)
logger.info(f"{env.history=}")
assert len(env.history) > 10
def test_environment_serdes():
environment = Environment()
role_a = RoleA()
shutil.rmtree(serdes_path.joinpath("team"), ignore_errors=True)
stg_path = serdes_path.joinpath("team/environment")
environment.add_role(role_a)
environment.serialize(stg_path)
new_env: Environment = Environment()
new_env.deserialize(stg_path)
assert len(new_env.roles) == 1

View file

@ -1,14 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/5/11 14:44
@Author : alexanderwu
@File : test_role.py
"""
from metagpt.roles import Role
def test_role_desc():
i = Role(profile='Sales', desc='Best Seller')
assert i.profile == 'Sales'
assert i._setting.desc == 'Best Seller'

View file

@ -25,7 +25,7 @@ def test_messages():
assert all([i in text for i in roles])
def test_message_serdes():
def test_message_serdeser():
out_mapping = {"field3": (str, ...), "field4": (list[str], ...)}
out_data = {"field3": "field3 value3", "field4": ["field4 value1", "field4 value2"]}
ic_obj = ActionOutput.create_model_class("code", out_mapping)
@ -37,7 +37,7 @@ def test_message_serdes():
cause_by=WriteCode
)
message_dict = serialize_general_message(message)
assert message_dict["cause_by"] == {"action_class": "WriteCode"}
assert message_dict["cause_by"] == {"action_class": "WriteCode", "module_name": "metagpt.actions.write_code"}
assert message_dict["instruct_content"] == {
"class": "code",
"mapping": {

View file

@ -2,26 +2,12 @@
# -*- coding: utf-8 -*-
# @Desc : unittest of team
from pathlib import Path
import shutil
from metagpt.team import Team
from tests.metagpt.roles.test_role import RoleA
serdes_path = Path(__file__).absolute().parent.joinpath("../data/serdes_storage")
from metagpt.roles.project_manager import ProjectManager
def test_team_serdes():
def test_team():
company = Team()
company.hire([RoleA()])
company.hire([ProjectManager()])
stg_path = serdes_path.joinpath("team")
shutil.rmtree(stg_path, ignore_errors=True)
company.serialize(stg_path=stg_path)
new_company = Team()
new_company.deserialize(stg_path)
assert len(new_company.environment.roles) == 1
assert len(company.environment.roles) == 1