diff --git a/examples/agent_creator.py b/examples/agent_creator.py index 325e7c260..d13cbcff2 100644 --- a/examples/agent_creator.py +++ b/examples/agent_creator.py @@ -1,22 +1,24 @@ -''' +""" Filename: MetaGPT/examples/agent_creator.py Created Date: Tuesday, September 12th 2023, 3:28:37 pm Author: garylin2099 -''' +@Modified By: mashenquan, 2023-11-1. Standardize the usage of message filtering-related features. +""" import re -from metagpt.const import PROJECT_ROOT, WORKSPACE_ROOT from metagpt.actions import Action +from metagpt.const import PROJECT_ROOT, WORKSPACE_ROOT +from metagpt.logs import logger from metagpt.roles import Role from metagpt.schema import Message -from metagpt.logs import logger +from metagpt.utils.common import get_object_name with open(PROJECT_ROOT / "examples/build_customized_agent.py", "r") as f: # use official example script to guide AgentCreator MULTI_ACTION_AGENT_CODE_EXAMPLE = f.read() -class CreateAgent(Action): +class CreateAgent(Action): PROMPT_TEMPLATE = """ ### BACKGROUND You are using an agent framework called metagpt to write agents capable of different actions, @@ -34,7 +36,6 @@ class CreateAgent(Action): """ async def run(self, example: str, instruction: str): - prompt = self.PROMPT_TEMPLATE.format(example=example, instruction=instruction) # logger.info(prompt) @@ -46,13 +47,14 @@ class CreateAgent(Action): @staticmethod def parse_code(rsp): - pattern = r'```python(.*)```' + pattern = r"```python(.*)```" match = re.search(pattern, rsp, re.DOTALL) code_text = match.group(1) if match else "" with open(WORKSPACE_ROOT / "agent_created_agent.py", "w") as f: f.write(code_text) return code_text + class AgentCreator(Role): def __init__( self, @@ -72,15 +74,15 @@ class AgentCreator(Role): instruction = msg.content code_text = await CreateAgent().run(example=self.agent_template, instruction=instruction) - msg = Message(content=code_text, role=self.profile, cause_by=todo) + msg = Message(content=code_text, role=self.profile, cause_by=get_object_name(todo)) return msg + if __name__ == "__main__": import asyncio async def main(): - agent_template = MULTI_ACTION_AGENT_CODE_EXAMPLE creator = AgentCreator(agent_template=agent_template) diff --git a/examples/build_customized_agent.py b/examples/build_customized_agent.py index 87d7a9c76..a953dee15 100644 --- a/examples/build_customized_agent.py +++ b/examples/build_customized_agent.py @@ -1,21 +1,23 @@ -''' +""" Filename: MetaGPT/examples/build_customized_agent.py Created Date: Tuesday, September 19th 2023, 6:52:25 pm Author: garylin2099 -''' +@Modified By: mashenquan, 2023-11-1. Standardize the usage of message filtering-related features. +""" +import asyncio import re import subprocess -import asyncio import fire from metagpt.actions import Action +from metagpt.logs import logger from metagpt.roles import Role from metagpt.schema import Message -from metagpt.logs import logger +from metagpt.utils.common import get_object_name + class SimpleWriteCode(Action): - PROMPT_TEMPLATE = """ Write a python function that can {instruction} and provide two runnnable test cases. Return ```python your_code_here ``` with NO other texts, @@ -35,7 +37,6 @@ class SimpleWriteCode(Action): super().__init__(name, context, llm) async def run(self, instruction: str): - prompt = self.PROMPT_TEMPLATE.format(instruction=instruction) rsp = await self._aask(prompt) @@ -46,11 +47,12 @@ class SimpleWriteCode(Action): @staticmethod def parse_code(rsp): - pattern = r'```python(.*)```' + pattern = r"```python(.*)```" match = re.search(pattern, rsp, re.DOTALL) code_text = match.group(1) if match else rsp return code_text + class SimpleRunCode(Action): def __init__(self, name="SimpleRunCode", context=None, llm=None): super().__init__(name, context, llm) @@ -61,6 +63,7 @@ class SimpleRunCode(Action): logger.info(f"{code_result=}") return code_result + class SimpleCoder(Role): def __init__( self, @@ -75,14 +78,15 @@ class SimpleCoder(Role): logger.info(f"{self._setting}: ready to {self._rc.todo}") todo = self._rc.todo - msg = self._rc.memory.get()[-1] # retrieve the latest memory + msg = self._rc.memory.get()[-1] # retrieve the latest memory instruction = msg.content code_text = await SimpleWriteCode().run(instruction) - msg = Message(content=code_text, role=self.profile, cause_by=todo) + msg = Message(content=code_text, role=self.profile, cause_by=get_object_name(todo)) return msg + class RunnableCoder(Role): def __init__( self, @@ -116,7 +120,7 @@ class RunnableCoder(Role): code_text = msg.content result = await SimpleRunCode().run(code_text) - msg = Message(content=result, role=self.profile, cause_by=todo) + msg = Message(content=result, role=self.profile, cause_by=get_object_name(todo)) self._rc.memory.add(msg) return msg @@ -128,6 +132,7 @@ class RunnableCoder(Role): await self._act() return Message(content="All job done", role=self.profile) + def main(msg="write a function that calculates the sum of a list"): # role = SimpleCoder() role = RunnableCoder() @@ -135,5 +140,6 @@ def main(msg="write a function that calculates the sum of a list"): result = asyncio.run(role.run(msg)) logger.info(result) -if __name__ == '__main__': + +if __name__ == "__main__": fire.Fire(main) diff --git a/examples/debate.py b/examples/debate.py index 05db28070..ade1a6fc4 100644 --- a/examples/debate.py +++ b/examples/debate.py @@ -1,17 +1,20 @@ -''' +""" Filename: MetaGPT/examples/debate.py Created Date: Tuesday, September 19th 2023, 6:52:25 pm Author: garylin2099 -''' +@Modified By: mashenquan, 2023-11-1. Standardize the usage of message filtering-related features. +""" import asyncio import platform + import fire -from metagpt.software_company import SoftwareCompany from metagpt.actions import Action, BossRequirement +from metagpt.logs import logger from metagpt.roles import Role from metagpt.schema import Message -from metagpt.logs import logger +from metagpt.software_company import SoftwareCompany + class ShoutOut(Action): """Action: Shout out loudly in a debate (quarrel)""" @@ -31,7 +34,6 @@ class ShoutOut(Action): super().__init__(name, context, llm) async def run(self, context: str, name: str, opponent_name: str): - prompt = self.PROMPT_TEMPLATE.format(context=context, name=name, opponent_name=opponent_name) # logger.info(prompt) @@ -39,6 +41,7 @@ class ShoutOut(Action): return rsp + class Trump(Role): def __init__( self, @@ -55,13 +58,13 @@ class Trump(Role): async def _observe(self) -> int: await super()._observe() # accept messages sent (from opponent) to self, disregard own messages from the last round - self._rc.news = [msg for msg in self._rc.news if msg.send_to == self.name] + self._rc.news = [msg for msg in self._rc.news if msg.is_recipient({self.name})] return len(self._rc.news) async def _act(self) -> Message: logger.info(f"{self._setting}: ready to {self._rc.todo}") - msg_history = self._rc.memory.get_by_actions([ShoutOut]) + msg_history = self._rc.memory.get_by_actions([ShoutOut.get_class_name()]) context = [] for m in msg_history: context.append(str(m)) @@ -72,13 +75,14 @@ class Trump(Role): msg = Message( content=rsp, role=self.profile, - cause_by=ShoutOut, - sent_from=self.name, - send_to=self.opponent_name, + cause_by=ShoutOut.get_class_name(), + tx_from=self.name, + tx_to=self.opponent_name, ) return msg + class Biden(Role): def __init__( self, @@ -96,13 +100,14 @@ class Biden(Role): await super()._observe() # accept the very first human instruction (the debate topic) or messages sent (from opponent) to self, # disregard own messages from the last round - self._rc.news = [msg for msg in self._rc.news if msg.cause_by == BossRequirement or msg.send_to == self.name] + message_filter = {BossRequirement.get_class_name(), self.name} + self._rc.news = [msg for msg in self._rc.news if msg.is_recipient(message_filter)] return len(self._rc.news) async def _act(self) -> Message: logger.info(f"{self._setting}: ready to {self._rc.todo}") - msg_history = self._rc.memory.get_by_actions([BossRequirement, ShoutOut]) + msg_history = self._rc.memory.get_by_actions([BossRequirement.get_class_name(), ShoutOut.get_class_name()]) context = [] for m in msg_history: context.append(str(m)) @@ -113,17 +118,19 @@ class Biden(Role): msg = Message( content=rsp, role=self.profile, - cause_by=ShoutOut, - sent_from=self.name, - send_to=self.opponent_name, + cause_by=ShoutOut.get_class_name(), + tx_from=self.name, + tx_to=self.opponent_name, ) return msg -async def startup(idea: str, investment: float = 3.0, n_round: int = 5, - code_review: bool = False, run_tests: bool = False): + +async def startup( + idea: str, investment: float = 3.0, n_round: int = 5, code_review: bool = False, run_tests: bool = False +): """We reuse the startup paradigm for roles to interact with each other. - Now we run a startup of presidents and watch they quarrel. :) """ + Now we run a startup of presidents and watch they quarrel. :)""" company = SoftwareCompany() company.hire([Biden(), Trump()]) company.invest(investment) @@ -133,7 +140,7 @@ async def startup(idea: str, investment: float = 3.0, n_round: int = 5, def main(idea: str, investment: float = 3.0, n_round: int = 10): """ - :param idea: Debate topic, such as "Topic: The U.S. should commit more in climate change fighting" + :param idea: Debate topic, such as "Topic: The U.S. should commit more in climate change fighting" or "Trump: Climate change is a hoax" :param investment: contribute a certain dollar amount to watch the debate :param n_round: maximum rounds of the debate @@ -144,5 +151,5 @@ def main(idea: str, investment: float = 3.0, n_round: int = 10): asyncio.run(startup(idea, investment, n_round)) -if __name__ == '__main__': +if __name__ == "__main__": fire.Fire(main) diff --git a/examples/sk_agent.py b/examples/sk_agent.py index a7513e838..19ee53669 100644 --- a/examples/sk_agent.py +++ b/examples/sk_agent.py @@ -4,6 +4,7 @@ @Time : 2023/9/13 12:36 @Author : femto Zheng @File : sk_agent.py +@Modified By: mashenquan, 2023-11-1. Standardize the usage of message filtering-related features. """ import asyncio @@ -39,7 +40,7 @@ async def basic_planner_example(): role.import_semantic_skill_from_directory(SKILL_DIRECTORY, "WriterSkill") role.import_skill(TextSkill(), "TextSkill") # using BasicPlanner - await role.run(Message(content=task, cause_by=BossRequirement)) + await role.run(Message(content=task, cause_by=BossRequirement.get_class_name())) async def sequential_planner_example(): @@ -53,7 +54,7 @@ async def sequential_planner_example(): role.import_semantic_skill_from_directory(SKILL_DIRECTORY, "WriterSkill") role.import_skill(TextSkill(), "TextSkill") # using BasicPlanner - await role.run(Message(content=task, cause_by=BossRequirement)) + await role.run(Message(content=task, cause_by=BossRequirement.get_class_name())) async def basic_planner_web_search_example(): @@ -64,7 +65,7 @@ async def basic_planner_web_search_example(): role.import_skill(SkSearchEngine(), "WebSearchSkill") # role.import_semantic_skill_from_directory(skills_directory, "QASkill") - await role.run(Message(content=task, cause_by=BossRequirement)) + await role.run(Message(content=task, cause_by=BossRequirement.get_class_name())) async def action_planner_example(): @@ -75,7 +76,7 @@ async def action_planner_example(): role.import_skill(TimeSkill(), "time") role.import_skill(TextSkill(), "text") task = "What is the sum of 110 and 990?" - await role.run(Message(content=task, cause_by=BossRequirement)) # it will choose mathskill.Add + await role.run(Message(content=task, cause_by=BossRequirement.get_class_name())) # it will choose mathskill.Add if __name__ == "__main__": diff --git a/metagpt/actions/action.py b/metagpt/actions/action.py index 790295d55..1954e750a 100644 --- a/metagpt/actions/action.py +++ b/metagpt/actions/action.py @@ -16,9 +16,10 @@ from metagpt.llm import LLM from metagpt.logs import logger from metagpt.utils.common import OutputParser from metagpt.utils.custom_decoder import CustomDecoder +from metagpt.utils.named import Named -class Action(ABC): +class Action(ABC, Named): def __init__(self, name: str = "", context=None, llm: LLM = None): self.name: str = name if llm is None: diff --git a/metagpt/actions/write_code.py b/metagpt/actions/write_code.py index c000805c5..421211d60 100644 --- a/metagpt/actions/write_code.py +++ b/metagpt/actions/write_code.py @@ -5,13 +5,14 @@ @Author : alexanderwu @File : write_code.py """ +from tenacity import retry, stop_after_attempt, wait_fixed + from metagpt.actions import WriteDesign from metagpt.actions.action import Action from metagpt.const import WORKSPACE_ROOT from metagpt.logs import logger from metagpt.schema import Message from metagpt.utils.common import CodeParser -from tenacity import retry, stop_after_attempt, wait_fixed PROMPT_TEMPLATE = """ NOTICE @@ -55,7 +56,8 @@ class WriteCode(Action): if self._is_invalid(filename): return - design = [i for i in context if i.cause_by == WriteDesign][0] + message_filter = {WriteDesign.get_class_name()} + design = [i for i in context if i.is_recipient(message_filter)][0] ws_name = CodeParser.parse_str(block="Python package name", text=design.content) ws_path = WORKSPACE_ROOT / ws_name @@ -74,9 +76,8 @@ class WriteCode(Action): async def run(self, context, filename): prompt = PROMPT_TEMPLATE.format(context=context, filename=filename) - logger.info(f'Writing {filename}..') + logger.info(f"Writing {filename}..") code = await self.write_code(prompt) # code_rsp = await self._aask_v1(prompt, "code_rsp", OUTPUT_MAPPING) # self._save(context, filename, code) return code - \ No newline at end of file diff --git a/metagpt/const.py b/metagpt/const.py index 7f3f87dfa..3fbc26784 100644 --- a/metagpt/const.py +++ b/metagpt/const.py @@ -41,3 +41,8 @@ INVOICE_OCR_TABLE_PATH = DATA_PATH / "invoice_table" SKILL_DIRECTORY = PROJECT_ROOT / "metagpt/skills" MEM_TTL = 24 * 30 * 3600 + +MESSAGE_ROUTE_FROM = "tx_from" +MESSAGE_ROUTE_TO = "tx_to" +MESSAGE_ROUTE_CAUSE_BY = "cause_by" +MESSAGE_META_ROLE = "role" diff --git a/metagpt/environment.py b/metagpt/environment.py index 24e6ada2f..ba0645a36 100644 --- a/metagpt/environment.py +++ b/metagpt/environment.py @@ -4,60 +4,61 @@ @Time : 2023/5/11 22:12 @Author : alexanderwu @File : environment.py +@Modified By: mashenquan, 2023-11-1. Optimization: + 1. Remove the functionality of `Environment` class as a public message buffer. + 2. Standardize the message forwarding behavior of the `Environment` class. + 3. Add the `is_idle` property. """ import asyncio from typing import Iterable from pydantic import BaseModel, Field -from metagpt.memory import Memory +from metagpt.logs import logger from metagpt.roles import Role from metagpt.schema import Message class Environment(BaseModel): """环境,承载一批角色,角色可以向环境发布消息,可以被其他角色观察到 - Environment, hosting a batch of roles, roles can publish messages to the environment, and can be observed by other roles - + Environment, hosting a batch of roles, roles can publish messages to the environment, and can be observed by other roles + """ roles: dict[str, Role] = Field(default_factory=dict) - memory: Memory = Field(default_factory=Memory) - history: str = Field(default='') class Config: arbitrary_types_allowed = True def add_role(self, role: Role): """增加一个在当前环境的角色 - Add a role in the current environment + Add a role in the current environment """ role.set_env(self) self.roles[role.profile] = role def add_roles(self, roles: Iterable[Role]): """增加一批在当前环境的角色 - Add a batch of characters in the current environment + Add a batch of characters in the current environment """ for role in roles: self.add_role(role) def publish_message(self, message: Message): - """向当前环境发布信息 - Post information to the current environment - """ - # self.message_queue.put(message) - self.memory.add(message) - self.history += f"\n{message}" + """Distribute the message to the recipients.""" + logger.info(f"publish_message: {message.save()}") + found = False + for r in self.roles.values(): + if message.is_recipient(r.subscribed_tags): + r.async_put_message(message) + found = True + if not found: + logger.warning(f"Message no recipients: {message.save()}") async def run(self, k=1): """处理一次所有信息的运行 Process all Role runs at once """ - # while not self.message_queue.empty(): - # message = self.message_queue.get() - # rsp = await self.manager.handle(message, self) - # self.message_queue.put(rsp) for _ in range(k): futures = [] for role in self.roles.values(): @@ -65,15 +66,24 @@ class Environment(BaseModel): futures.append(future) await asyncio.gather(*futures) + logger.info(f"is idle: {self.is_idle}") def get_roles(self) -> dict[str, Role]: """获得环境内的所有角色 - Process all Role runs at once + Process all Role runs at once """ return self.roles def get_role(self, name: str) -> Role: """获得环境内的指定角色 - get all the environment roles + get all the environment roles """ return self.roles.get(name, None) + + @property + def is_idle(self): + """If true, all actions have been executed.""" + for r in self.roles.values(): + if not r.is_idle: + return False + return True diff --git a/metagpt/memory/longterm_memory.py b/metagpt/memory/longterm_memory.py index f8abea5f3..b5bb73b6b 100644 --- a/metagpt/memory/longterm_memory.py +++ b/metagpt/memory/longterm_memory.py @@ -1,6 +1,10 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# @Desc : the implement of Long-term memory +""" +@Desc : the implement of Long-term memory +@Modified By: mashenquan, 2023-11-1. Optimization: + 1. Replace code related to message filtering with the `Message.is_recipient` function. +""" from metagpt.logs import logger from metagpt.memory import Memory @@ -36,11 +40,10 @@ class LongTermMemory(Memory): def add(self, message: Message): super(LongTermMemory, self).add(message) - for action in self.rc.watch: - if message.cause_by == action and not self.msg_from_recover: - # currently, only add role's watching messages to its memory_storage - # and ignore adding messages from recover repeatedly - self.memory_storage.add(message) + if message.is_recipient(self.rc.watch) and not self.msg_from_recover: + # currently, only add role's watching messages to its memory_storage + # and ignore adding messages from recover repeatedly + self.memory_storage.add(message) def find_news(self, observed: list[Message], k=0) -> list[Message]: """ @@ -68,4 +71,3 @@ class LongTermMemory(Memory): def clear(self): super(LongTermMemory, self).clear() self.memory_storage.clean() - \ No newline at end of file diff --git a/metagpt/memory/memory.py b/metagpt/memory/memory.py index c818fa707..8e01544f1 100644 --- a/metagpt/memory/memory.py +++ b/metagpt/memory/memory.py @@ -4,11 +4,11 @@ @Time : 2023/5/20 12:15 @Author : alexanderwu @File : memory.py +@Modified By: mashenquan, 2023-11-1. Standardize the design of message filtering-related features. """ from collections import defaultdict -from typing import Iterable, Type +from typing import Iterable, Set -from metagpt.actions import Action from metagpt.schema import Message @@ -18,7 +18,7 @@ class Memory: def __init__(self): """Initialize an empty storage list and an empty index dictionary""" self.storage: list[Message] = [] - self.index: dict[Type[Action], list[Message]] = defaultdict(list) + self.index: dict[str, list[Message]] = defaultdict(list) def add(self, message: Message): """Add a new message to storage, while updating the index""" @@ -73,11 +73,11 @@ class Memory: news.append(i) return news - def get_by_action(self, action: Type[Action]) -> list[Message]: + def get_by_action(self, action: str) -> list[Message]: """Return all messages triggered by a specified Action""" return self.index[action] - def get_by_actions(self, actions: Iterable[Type[Action]]) -> list[Message]: + def get_by_actions(self, actions: Set[str]) -> list[Message]: """Return all messages triggered by specified Actions""" rsp = [] for action in actions: @@ -85,4 +85,3 @@ class Memory: continue rsp += self.index[action] return rsp - \ No newline at end of file diff --git a/metagpt/roles/engineer.py b/metagpt/roles/engineer.py index 6d65575a8..9826ea0b7 100644 --- a/metagpt/roles/engineer.py +++ b/metagpt/roles/engineer.py @@ -4,6 +4,10 @@ @Time : 2023/5/11 14:43 @Author : alexanderwu @File : engineer.py +@Modified By: mashenquan, 2023-11-1. Optimization: + 1. Consolidate message reception and processing logic within `_observe`. + 2. Fix bug: Add logic for handling asynchronous message processing when messages are not ready. + 3. Supplemented the external transmission of internal messages. """ import asyncio import shutil @@ -15,7 +19,7 @@ from metagpt.const import WORKSPACE_ROOT from metagpt.logs import logger from metagpt.roles import Role from metagpt.schema import Message -from metagpt.utils.common import CodeParser +from metagpt.utils.common import CodeParser, get_object_name from metagpt.utils.special_tokens import FILENAME_CODE_SEP, MSG_SEP @@ -75,7 +79,7 @@ class Engineer(Role): self.use_code_review = use_code_review if self.use_code_review: self._init_actions([WriteCode, WriteCodeReview]) - self._watch([WriteTasks]) + self._watch([WriteTasks, WriteDesign]) self.todos = [] self.n_borg = n_borg @@ -96,7 +100,7 @@ class Engineer(Role): return CodeParser.parse_str(block="Python package name", text=system_design_msg.content) def get_workspace(self) -> Path: - msg = self._rc.memory.get_by_action(WriteDesign)[-1] + msg = self._rc.memory.get_by_action(WriteDesign.get_class_name())[-1] if not msg: return WORKSPACE_ROOT / "src" workspace = self.parse_workspace(msg) @@ -119,17 +123,13 @@ class Engineer(Role): file.write_text(code) return file - def recv(self, message: Message) -> None: - self._rc.memory.add(message) - if message in self._rc.important_memory: - self.todos = self.parse_tasks(message) - async def _act_mp(self) -> Message: # self.recreate_workspace() todo_coros = [] for todo in self.todos: todo_coro = WriteCode().run( - context=self._rc.memory.get_by_actions([WriteTasks, WriteDesign]), filename=todo + context=self._rc.memory.get_by_actions([WriteTasks.get_class_name(), WriteDesign.get_class_name()]), + filename=todo, ) todo_coros.append(todo_coro) @@ -139,12 +139,13 @@ class Engineer(Role): logger.info(todo) logger.info(code_rsp) # self.write_file(todo, code) - msg = Message(content=code_rsp, role=self.profile, cause_by=type(self._rc.todo)) + msg = Message(content=code_rsp, role=self.profile, cause_by=get_object_name(self._rc.todo)) self._rc.memory.add(msg) + self.publish_message(msg) del self.todos[0] logger.info(f"Done {self.get_workspace()} generating.") - msg = Message(content="all done.", role=self.profile, cause_by=type(self._rc.todo)) + msg = Message(content="all done.", role=self.profile, cause_by=get_object_name(self._rc.todo)) return msg async def _act_sp(self) -> Message: @@ -155,15 +156,19 @@ class Engineer(Role): # logger.info(code_rsp) # code = self.parse_code(code_rsp) file_path = self.write_file(todo, code) - msg = Message(content=code, role=self.profile, cause_by=type(self._rc.todo)) + msg = Message(content=code, role=self.profile, cause_by=get_object_name(self._rc.todo)) self._rc.memory.add(msg) + self.publish_message(msg) code_msg = todo + FILENAME_CODE_SEP + str(file_path) code_msg_all.append(code_msg) logger.info(f"Done {self.get_workspace()} generating.") msg = Message( - content=MSG_SEP.join(code_msg_all), role=self.profile, cause_by=type(self._rc.todo), send_to="QaEngineer" + content=MSG_SEP.join(code_msg_all), + role=self.profile, + cause_by=get_object_name(self._rc.todo), + tx_to="QaEngineer", ) return msg @@ -178,7 +183,8 @@ class Engineer(Role): TODO: The goal is not to need it. After clear task decomposition, based on the design idea, you should be able to write a single file without needing other codes. If you can't, it means you need a clearer definition. This is the key to writing longer code. """ context = [] - msg = self._rc.memory.get_by_actions([WriteDesign, WriteTasks, WriteCode]) + msg_filters = [WriteDesign.get_class_name(), WriteTasks.get_class_name(), WriteCode.get_class_name()] + msg = self._rc.memory.get_by_actions(msg_filters) for m in msg: context.append(m.content) context_str = "\n".join(context) @@ -193,20 +199,50 @@ class Engineer(Role): logger.error("code review failed!", e) pass file_path = self.write_file(todo, code) - msg = Message(content=code, role=self.profile, cause_by=WriteCode) + msg = Message(content=code, role=self.profile, cause_by=WriteCode.get_class_name()) self._rc.memory.add(msg) + self.publish_message(msg) code_msg = todo + FILENAME_CODE_SEP + str(file_path) code_msg_all.append(code_msg) logger.info(f"Done {self.get_workspace()} generating.") msg = Message( - content=MSG_SEP.join(code_msg_all), role=self.profile, cause_by=type(self._rc.todo), send_to="QaEngineer" + content=MSG_SEP.join(code_msg_all), + role=self.profile, + cause_by=get_object_name(self._rc.todo), + tx_to="QaEngineer", ) return msg async def _act(self) -> Message: """Determines the mode of action based on whether code review is used.""" + if not self._rc.todo: + return None if self.use_code_review: return await self._act_sp_precision() return await self._act_sp() + + async def _observe(self) -> int: + ret = await super(Engineer, self)._observe() + if ret == 0: + return ret + + # Parse task lists + message_filter = {WriteTasks.get_class_name()} + for message in self._rc.news: + if not message.is_recipient(message_filter): + continue + self.todos = self.parse_tasks(message) + + return ret + + async def _think(self) -> None: + # In asynchronous scenarios, first check if the required messages are ready. + filters = {WriteTasks.get_class_name()} + msgs = self._rc.memory.get_by_actions(filters) + if not msgs: + self._rc.todo = None + return + + await super(Engineer, self)._think() diff --git a/metagpt/roles/qa_engineer.py b/metagpt/roles/qa_engineer.py index a763c2ce8..b83ab6e21 100644 --- a/metagpt/roles/qa_engineer.py +++ b/metagpt/roles/qa_engineer.py @@ -4,6 +4,7 @@ @Time : 2023/5/11 14:43 @Author : alexanderwu @File : qa_engineer.py +@Modified By: mashenquan, 2023-11-1. Standardize the usage of message filtering-related features. """ import os from pathlib import Path @@ -48,7 +49,7 @@ class QaEngineer(Role): return CodeParser.parse_str(block="Python package name", text=system_design_msg.content) def get_workspace(self, return_proj_dir=True) -> Path: - msg = self._rc.memory.get_by_action(WriteDesign)[-1] + msg = self._rc.memory.get_by_action(WriteDesign.get_class_name())[-1] if not msg: return WORKSPACE_ROOT / "src" workspace = self.parse_workspace(msg) @@ -97,11 +98,11 @@ class QaEngineer(Role): msg = Message( content=str(file_info), role=self.profile, - cause_by=WriteTest, - sent_from=self.profile, - send_to=self.profile, + cause_by=WriteTest.get_class_name(), + tx_from=self.profile, + tx_to=self.profile, ) - self._publish_message(msg) + self.publish_message(msg) logger.info(f"Done {self.get_workspace()}/tests generating.") @@ -131,8 +132,10 @@ class QaEngineer(Role): recipient = parse_recipient(result_msg) # the recipient might be Engineer or myself content = str(file_info) + FILENAME_CODE_SEP + result_msg - msg = Message(content=content, role=self.profile, cause_by=RunCode, sent_from=self.profile, send_to=recipient) - self._publish_message(msg) + msg = Message( + content=content, role=self.profile, cause_by=RunCode.get_class_name(), tx_from=self.profile, tx_to=recipient + ) + self.publish_message(msg) async def _debug_error(self, msg): file_info, context = msg.content.split(FILENAME_CODE_SEP) @@ -141,14 +144,18 @@ class QaEngineer(Role): self.write_file(file_name, code) recipient = msg.sent_from # send back to the one who ran the code for another run, might be one's self msg = Message( - content=file_info, role=self.profile, cause_by=DebugError, sent_from=self.profile, send_to=recipient + content=file_info, + role=self.profile, + cause_by=DebugError.get_class_name(), + tx_from=self.profile, + tx_to=recipient, ) - self._publish_message(msg) + self.publish_message(msg) async def _observe(self) -> int: await super()._observe() self._rc.news = [ - msg for msg in self._rc.news if msg.send_to == self.profile + msg for msg in self._rc.news if msg.is_recipient({self.profile}) ] # only relevant msgs count as observed news return len(self._rc.news) @@ -157,30 +164,31 @@ class QaEngineer(Role): result_msg = Message( content=f"Exceeding {self.test_round_allowed} rounds of tests, skip (writing code counts as a round, too)", role=self.profile, - cause_by=WriteTest, - sent_from=self.profile, - send_to="", + cause_by=WriteTest.get_class_name(), + tx_from=self.profile, ) return result_msg + code_filters = {WriteCode.get_class_name(), WriteCodeReview.get_class_name()} + test_filters = {WriteTest.get_class_name(), DebugError.get_class_name()} + run_filters = {RunCode.get_class_name()} for msg in self._rc.news: # Decide what to do based on observed msg type, currently defined by human, # might potentially be moved to _think, that is, let the agent decides for itself - if msg.cause_by in [WriteCode, WriteCodeReview]: + if msg.is_recipient(code_filters): # engineer wrote a code, time to write a test for it await self._write_test(msg) - elif msg.cause_by in [WriteTest, DebugError]: + elif msg.is_recipient(test_filters): # I wrote or debugged my test code, time to run it await self._run_code(msg) - elif msg.cause_by == RunCode: + elif msg.is_recipient(run_filters): # I ran my test code, time to fix bugs, if any await self._debug_error(msg) self.test_round += 1 result_msg = Message( content=f"Round {self.test_round} of tests done", role=self.profile, - cause_by=WriteTest, - sent_from=self.profile, - send_to="", + cause_by=WriteTest.get_class_name(), + tx_from=self.profile, ) return result_msg diff --git a/metagpt/roles/researcher.py b/metagpt/roles/researcher.py index acb46c718..43ee7971d 100644 --- a/metagpt/roles/researcher.py +++ b/metagpt/roles/researcher.py @@ -1,4 +1,8 @@ #!/usr/bin/env python +""" +@Modified By: mashenquan, 2023-11-1. Standardize the usage of message filtering-related features. +""" + import asyncio @@ -10,6 +14,7 @@ from metagpt.const import RESEARCH_PATH from metagpt.logs import logger from metagpt.roles import Role from metagpt.schema import Message +from metagpt.utils.common import get_object_name class Report(BaseModel): @@ -58,18 +63,22 @@ class Researcher(Role): research_system_text = get_research_system_text(topic, self.language) if isinstance(todo, CollectLinks): links = await todo.run(topic, 4, 4) - ret = Message("", Report(topic=topic, links=links), role=self.profile, cause_by=type(todo)) + ret = Message("", Report(topic=topic, links=links), role=self.profile, cause_by=get_object_name(todo)) elif isinstance(todo, WebBrowseAndSummarize): links = instruct_content.links todos = (todo.run(*url, query=query, system_text=research_system_text) for (query, url) in links.items()) summaries = await asyncio.gather(*todos) summaries = list((url, summary) for i in summaries for (url, summary) in i.items() if summary) - ret = Message("", Report(topic=topic, summaries=summaries), role=self.profile, cause_by=type(todo)) + ret = Message( + "", Report(topic=topic, summaries=summaries), role=self.profile, cause_by=get_object_name(todo) + ) else: summaries = instruct_content.summaries summary_text = "\n---\n".join(f"url: {url}\nsummary: {summary}" for (url, summary) in summaries) content = await self._rc.todo.run(topic, summary_text, system_text=research_system_text) - ret = Message("", Report(topic=topic, content=content), role=self.profile, cause_by=type(self._rc.todo)) + ret = Message( + "", Report(topic=topic, content=content), role=self.profile, get_object_name=type(self._rc.todo) + ) self._rc.memory.add(ret) return ret diff --git a/metagpt/roles/role.py b/metagpt/roles/role.py index 44bb3e976..0a6716428 100644 --- a/metagpt/roles/role.py +++ b/metagpt/roles/role.py @@ -4,20 +4,32 @@ @Time : 2023/5/11 14:42 @Author : alexanderwu @File : role.py +@Modified By: mashenquan, 2023-11-1. Optimization: + 1. Merge the `recv` functionality into the `_observe` function. Future message reading operations will be + consolidated within the `_observe` function. + 2. Standardize the message filtering for string label matching. Role objects can access the message labels + they've subscribed to through the `subscribed_tags` property. + 3. Move the message receive buffer from the global variable `self._rc.env.memory` to the role's private variable + `self._rc.msg_buffer` for easier message identification and asynchronous appending of messages. + 4. Standardize the way messages are passed: `publish_message` sends messages out, while `async_put_message` places + messages into the Role object's private message receive buffer. There are no other message transmit methods. + 5. Standardize the parameters for the `run` function: the `test_message` parameter is used for testing purposes + only. In the normal workflow, you should use `publish_message` or `async_put_message` to transmit messages. """ from __future__ import annotations -from typing import Iterable, Type +from typing import Iterable, Set, Type from pydantic import BaseModel, Field -# from metagpt.environment import Environment -from metagpt.config import CONFIG from metagpt.actions import Action, ActionOutput +from metagpt.config import CONFIG from metagpt.llm import LLM from metagpt.logs import logger -from metagpt.memory import Memory, LongTermMemory -from metagpt.schema import Message +from metagpt.memory import LongTermMemory, Memory +from metagpt.schema import Message, MessageQueue +from metagpt.utils.common import get_class_name, get_object_name +from metagpt.utils.named import Named PREFIX_TEMPLATE = """You are a {profile}, named {name}, your goal is {goal}, and the constraint is {constraints}. """ @@ -49,6 +61,7 @@ ROLE_TEMPLATE = """Your response should be based on the previous conversation hi class RoleSetting(BaseModel): """Role Settings""" + name: str profile: str goal: str @@ -64,12 +77,14 @@ class RoleSetting(BaseModel): class RoleContext(BaseModel): """Role Runtime Context""" - env: 'Environment' = Field(default=None) + + env: "Environment" = Field(default=None) + msg_buffer: MessageQueue = Field(default_factory=MessageQueue) # Message Buffer with Asynchronous Updates memory: Memory = Field(default_factory=Memory) long_term_memory: LongTermMemory = Field(default_factory=LongTermMemory) state: int = Field(default=0) todo: Action = Field(default=None) - watch: set[Type[Action]] = Field(default_factory=set) + watch: set[str] = Field(default_factory=set) news: list[Type[Message]] = Field(default=[]) class Config: @@ -90,7 +105,7 @@ class RoleContext(BaseModel): return self.memory.get() -class Role: +class Role(Named): """Role/Agent""" def __init__(self, name="", profile="", goal="", constraints="", desc=""): @@ -118,7 +133,8 @@ class Role: def _watch(self, actions: Iterable[Type[Action]]): """Listen to the corresponding behaviors""" - self._rc.watch.update(actions) + tags = [get_class_name(t) for t in actions] + self._rc.watch.update(tags) # check RoleContext after adding watch actions self._rc.check(self._role_id) @@ -128,7 +144,7 @@ class Role: logger.debug(self._actions) self._rc.todo = self._actions[self._rc.state] - def set_env(self, env: 'Environment'): + def set_env(self, env: "Environment"): """Set the environment in which the role works. The role can talk to the environment and can also receive messages by observing.""" self._rc.env = env @@ -137,6 +153,24 @@ class Role: """Get the role description (position)""" return self._setting.profile + @property + def name(self): + """Get virtual user name""" + return self._setting.name + + @property + def subscribed_tags(self) -> Set: + """The labels for messages to be consumed by the Role object.""" + if self._rc.watch: + return self._rc.watch + return { + self.name, + self.get_object_name(), + self.profile, + f"{self.name}({self.profile})", + f"{self.name}({self.get_object_name()})", + } + def _get_prefix(self): """Get the role prefix""" if self._setting.desc: @@ -150,94 +184,99 @@ class Role: self._set_state(0) return prompt = self._get_prefix() - prompt += STATE_TEMPLATE.format(history=self._rc.history, states="\n".join(self._states), - n_states=len(self._states) - 1) + prompt += STATE_TEMPLATE.format( + history=self._rc.history, states="\n".join(self._states), n_states=len(self._states) - 1 + ) next_state = await self._llm.aask(prompt) logger.debug(f"{prompt=}") if not next_state.isdigit() or int(next_state) not in range(len(self._states)): - logger.warning(f'Invalid answer of state, {next_state=}') + logger.warning(f"Invalid answer of state, {next_state=}") next_state = "0" self._set_state(int(next_state)) async def _act(self) -> Message: - # prompt = self.get_prefix() - # prompt += ROLE_TEMPLATE.format(name=self.profile, state=self.states[self.state], result=response, - # history=self.history) - logger.info(f"{self._setting}: ready to {self._rc.todo}") response = await self._rc.todo.run(self._rc.important_memory) - # logger.info(response) if isinstance(response, ActionOutput): - msg = Message(content=response.content, instruct_content=response.instruct_content, - role=self.profile, cause_by=type(self._rc.todo)) + msg = Message( + content=response.content, + instruct_content=response.instruct_content, + role=self.profile, + cause_by=get_object_name(self._rc.todo), + tx_from=get_object_name(self), + ) else: - msg = Message(content=response, role=self.profile, cause_by=type(self._rc.todo)) - self._rc.memory.add(msg) - # logger.debug(f"{response}") + msg = Message( + content=response, + role=self.profile, + cause_by=get_object_name(self._rc.todo), + tx_from=get_object_name(self), + ) return msg async def _observe(self) -> int: - """Observe from the environment, obtain important information, and add it to memory""" - if not self._rc.env: - return 0 - env_msgs = self._rc.env.memory.get() - - observed = self._rc.env.memory.get_by_actions(self._rc.watch) - - self._rc.news = self._rc.memory.find_news(observed) # find news (previously unseen messages) from observed messages - - for i in env_msgs: - self.recv(i) + """Prepare new messages for processing from the message buffer and other sources.""" + # Read unprocessed messages from the msg buffer. + self._rc.news = self._rc.msg_buffer.pop_all() + # Store the read messages in your own memory to prevent duplicate processing. + self._rc.memory.add_batch(self._rc.news) + # Design Rules: + # If you need to further categorize Message objects, you can do so using the Message.set_meta function. + # msg_buffer is a receiving buffer, avoid adding message data and operations to msg_buffer. news_text = [f"{i.role}: {i.content[:20]}..." for i in self._rc.news] if news_text: - logger.debug(f'{self._setting} observed: {news_text}') + logger.debug(f"{self._setting} observed: {news_text}") return len(self._rc.news) - def _publish_message(self, msg): + def publish_message(self, msg): """If the role belongs to env, then the role's messages will be broadcast to env""" + if not msg: + return if not self._rc.env: # If env does not exist, do not publish the message return self._rc.env.publish_message(msg) + def async_put_message(self, message): + """Place the message into the Role object's private message buffer.""" + if not message: + return + self._rc.msg_buffer.push(message) + async def _react(self) -> Message: """Think first, then act""" await self._think() logger.debug(f"{self._setting}: {self._rc.state=}, will do {self._rc.todo}") return await self._act() - def recv(self, message: Message) -> None: - """add message to history.""" - # self._history += f"\n{message}" - # self._context = self._history - if message in self._rc.memory.get(): - return - self._rc.memory.add(message) - - async def handle(self, message: Message) -> Message: - """Receive information and reply with actions""" - # logger.debug(f"{self.name=}, {self.profile=}, {message.role=}") - self.recv(message) - - return await self._react() - - async def run(self, message=None): + async def run(self, test_message=None): """Observe, and think and act based on the results of the observation""" - if message: - if isinstance(message, str): - message = Message(message) - if isinstance(message, Message): - self.recv(message) - if isinstance(message, list): - self.recv(Message("\n".join(message))) - elif not await self._observe(): + if test_message: # For test + seed = None + if isinstance(test_message, str): + seed = Message(test_message) + elif isinstance(test_message, Message): + seed = test_message + elif isinstance(test_message, list): + seed = Message("\n".join(test_message)) + self.async_put_message(seed) + + if not await self._observe(): # If there is no new information, suspend and wait logger.debug(f"{self._setting}: no news. waiting.") return rsp = await self._react() - # Publish the reply to the environment, waiting for the next subscriber to process - self._publish_message(rsp) + + # Reset the next action to be taken. + self._rc.todo = None + # Send the response message to the Environment object to have it relay the message to the subscribers. + self.publish_message(rsp) return rsp + + @property + def is_idle(self) -> bool: + """If true, all actions have been executed.""" + return not self._rc.news and not self._rc.todo and self._rc.msg_buffer.empty() diff --git a/metagpt/roles/seacher.py b/metagpt/roles/seacher.py index 0b6e089da..95be89277 100644 --- a/metagpt/roles/seacher.py +++ b/metagpt/roles/seacher.py @@ -4,18 +4,20 @@ @Time : 2023/5/23 17:25 @Author : alexanderwu @File : seacher.py +@Modified By: mashenquan, 2023-11-1. Standardize the usage of message filtering-related features. """ from metagpt.actions import ActionOutput, SearchAndSummarize from metagpt.logs import logger from metagpt.roles import Role from metagpt.schema import Message from metagpt.tools import SearchEngineType +from metagpt.utils.common import get_object_name class Searcher(Role): """ Represents a Searcher role responsible for providing search services to users. - + Attributes: name (str): Name of the searcher. profile (str): Role profile. @@ -23,17 +25,19 @@ class Searcher(Role): constraints (str): Constraints or limitations for the searcher. engine (SearchEngineType): The type of search engine to use. """ - - def __init__(self, - name: str = 'Alice', - profile: str = 'Smart Assistant', - goal: str = 'Provide search services for users', - constraints: str = 'Answer is rich and complete', - engine=SearchEngineType.SERPAPI_GOOGLE, - **kwargs) -> None: + + def __init__( + self, + name: str = "Alice", + profile: str = "Smart Assistant", + goal: str = "Provide search services for users", + constraints: str = "Answer is rich and complete", + engine=SearchEngineType.SERPAPI_GOOGLE, + **kwargs, + ) -> None: """ Initializes the Searcher role with given attributes. - + Args: name (str): Name of the searcher. profile (str): Role profile. @@ -53,12 +57,16 @@ class Searcher(Role): """Performs the search action in a single process.""" logger.info(f"{self._setting}: ready to {self._rc.todo}") response = await self._rc.todo.run(self._rc.memory.get(k=0)) - + if isinstance(response, ActionOutput): - msg = Message(content=response.content, instruct_content=response.instruct_content, - role=self.profile, cause_by=type(self._rc.todo)) + msg = Message( + content=response.content, + instruct_content=response.instruct_content, + role=self.profile, + cause_by=get_object_name(self._rc.todo), + ) else: - msg = Message(content=response, role=self.profile, cause_by=type(self._rc.todo)) + msg = Message(content=response, role=self.profile, cause_by=get_object_name(self._rc.todo)) self._rc.memory.add(msg) return msg diff --git a/metagpt/roles/sk_agent.py b/metagpt/roles/sk_agent.py index b27841d74..abebb9605 100644 --- a/metagpt/roles/sk_agent.py +++ b/metagpt/roles/sk_agent.py @@ -4,6 +4,7 @@ @Time : 2023/9/13 12:23 @Author : femto Zheng @File : sk_agent.py +@Modified By: mashenquan, 2023-11-1. Standardize the usage of message filtering-related features. """ from semantic_kernel.planning import SequentialPlanner from semantic_kernel.planning.action_planner.action_planner import ActionPlanner @@ -14,6 +15,7 @@ from metagpt.actions.execute_task import ExecuteTask from metagpt.logs import logger from metagpt.roles import Role from metagpt.schema import Message +from metagpt.utils.common import get_object_name from metagpt.utils.make_sk_kernel import make_sk_kernel @@ -70,7 +72,7 @@ class SkAgent(Role): result = (await self.plan.invoke_async()).result logger.info(result) - msg = Message(content=result, role=self.profile, cause_by=type(self._rc.todo)) + msg = Message(content=result, role=self.profile, cause_by=get_object_name(self._rc.todo)) self._rc.memory.add(msg) - # logger.debug(f"{response}") + self.publish_message(msg) return msg diff --git a/metagpt/schema.py b/metagpt/schema.py index 1124fb28e..e0d17e0ed 100644 --- a/metagpt/schema.py +++ b/metagpt/schema.py @@ -8,12 +8,20 @@ """ from __future__ import annotations +import asyncio import json +from asyncio import Queue, QueueEmpty, wait_for from json import JSONDecodeError -from typing import Dict, List, TypedDict +from typing import Dict, List, Set, TypedDict from pydantic import BaseModel, Field +from metagpt.const import ( + MESSAGE_META_ROLE, + MESSAGE_ROUTE_CAUSE_BY, + MESSAGE_ROUTE_FROM, + MESSAGE_ROUTE_TO, +) from metagpt.logs import logger @@ -22,44 +30,150 @@ class RawMessage(TypedDict): role: str +class Routes(BaseModel): + """Responsible for managing routing information for the Message class.""" + + routes: List[Dict] = Field(default_factory=list) + + def set_from(self, value): + """Set the label of the message sender.""" + route = self._get_route() + route[MESSAGE_ROUTE_FROM] = value + + def set_to(self, tags: Set): + """Set the labels of the message recipient.""" + route = self._get_route() + if tags: + route[MESSAGE_ROUTE_TO] = tags + return + + if MESSAGE_ROUTE_TO in route: + del route[MESSAGE_ROUTE_TO] + + def add_to(self, tag: str): + """Add a label of the message recipient.""" + route = self._get_route() + tags = route.get(MESSAGE_ROUTE_TO, set()) + tags.add(tag) + route[MESSAGE_ROUTE_TO] = tags + + def _get_route(self) -> Dict: + if not self.routes: + self.routes.append({}) + return self.routes[0] + + def is_recipient(self, tags: Set) -> bool: + """Check if it is the message recipient.""" + route = self._get_route() + to_tags = route.get(MESSAGE_ROUTE_TO) + if not to_tags: + return True + + for k in tags: + if k in to_tags: + return True + return False + + @property + def tx_from(self): + """Message route info tells who sent this message.""" + route = self._get_route() + return route.get(MESSAGE_ROUTE_FROM) + + @property + def tx_to(self): + """Labels for the consumer to filter its subscribed messages.""" + route = self._get_route() + return route.get(MESSAGE_ROUTE_TO) + + class Message(BaseModel): """list[: ]""" content: str instruct_content: BaseModel = None meta_info: Dict = Field(default_factory=dict) - route: List[Dict] = Field(default_factory=list) + route: Routes = Field(default_factory=Routes) def __init__(self, content, **kwargs): + """ + :param content: Message content. + :param instruct_content: Message content struct. + :param meta_info: Message meta info. + :param route: Message route configuration. + :param tx_from: Message route info tells who sent this message. + :param tx_to: Labels for the consumer to filter its subscribed messages. + :param cause_by: Labels for the consumer to filter its subscribed messages, also serving as meta info. + :param role: Message meta info tells who sent this message. + """ super(Message, self).__init__( content=content or kwargs.get("content"), instruct_content=kwargs.get("instruct_content"), meta_info=kwargs.get("meta_info", {}), - route=kwargs.get("route", []), + route=kwargs.get("route", Routes()), ) attribute_names = Message.__annotations__.keys() for k, v in kwargs.items(): if k in attribute_names: continue + if k == MESSAGE_ROUTE_FROM: + self.set_from(v) + continue + if k == MESSAGE_ROUTE_CAUSE_BY: + self.meta_info[k] = v + if k == MESSAGE_ROUTE_TO or k == MESSAGE_ROUTE_CAUSE_BY: + self.add_to(v) + continue self.meta_info[k] = v def get_meta(self, key): + """Get meta info""" return self.meta_info.get(key) def set_meta(self, key, value): + """Set meta info""" self.meta_info[key] = value @property def role(self): - return self.get_meta("role") + """Message meta info tells who sent this message.""" + return self.get_meta(MESSAGE_META_ROLE) @property def cause_by(self): - return self.get_meta("cause_by") + """Labels for the consumer to filter its subscribed messages, also serving as meta info.""" + return self.get_meta(MESSAGE_ROUTE_CAUSE_BY) + + @property + def tx_from(self): + """Message route info tells who sent this message.""" + return self.route.tx_from + + @property + def tx_to(self): + """Labels for the consumer to filter its subscribed messages.""" + return self.route.tx_to def set_role(self, v): - self.set_meta("role", v) + """Set the message's meta info indicating the sender.""" + self.set_meta(MESSAGE_META_ROLE, v) + + def set_from(self, v): + """Set the message's meta info indicating the sender.""" + self.route.set_from(v) + + def set_to(self, tags: Set): + """Set the message's meta info indicating the sender.""" + self.route.set_to(tags) + + def add_to(self, tag: str): + """Add a subscription label for the recipients.""" + self.route.add_to(tag) + + def is_recipient(self, tags: Set): + """Return true if any input label exists in the message's subscription labels.""" + return self.route.is_recipient(tags) def __str__(self): # prefix = '-'.join([self.role, str(self.cause_by)]) @@ -69,13 +183,16 @@ class Message(BaseModel): return self.__str__() def to_dict(self) -> dict: + """Return a dict containing `role` and `content` for the LLM call.l""" return {"role": self.role, "content": self.content} def save(self) -> str: + """Convert the object to json string""" return self.json(exclude_none=True) @staticmethod def load(v): + """Convert the json string to object.""" try: d = json.loads(v) return Message(**d) @@ -90,7 +207,7 @@ class UserMessage(Message): """ def __init__(self, content: str): - super(Message, self).__init__(content=content, meta_info={"role": "user"}) + super().__init__(content=content, role="user") class SystemMessage(Message): @@ -99,7 +216,7 @@ class SystemMessage(Message): """ def __init__(self, content: str): - super().__init__(content=content, meta_info={"role": "system"}) + super().__init__(content=content, role="system") class AIMessage(Message): @@ -108,7 +225,65 @@ class AIMessage(Message): """ def __init__(self, content: str): - super().__init__(content=content, meta_info={"role": "assistant"}) + super().__init__(content=content, role="assistant") + + +class MessageQueue: + def __init__(self): + self._queue = Queue() + + def pop(self) -> Message | None: + try: + item = self._queue.get_nowait() + if item: + self._queue.task_done() + return item + except QueueEmpty: + return None + + def pop_all(self) -> List[Message]: + ret = [] + while True: + msg = self.pop() + if not msg: + break + ret.append(msg) + return ret + + def push(self, msg: Message): + self._queue.put_nowait(msg) + + def empty(self): + return self._queue.empty() + + async def save(self) -> str: + if self.empty(): + return "[]" + + lst = [] + try: + while True: + item = await wait_for(self._queue.get(), timeout=1.0) + if item is None: + break + lst.append(item.dict(exclude_none=True)) + self._queue.task_done() + except asyncio.TimeoutError: + logger.debug("Queue is empty, exiting...") + return json.dumps(lst) + + @staticmethod + def load(self, v) -> "MessageQueue": + q = MessageQueue() + try: + lst = json.loads(v) + for i in lst: + msg = Message(**i) + q.push(msg) + except JSONDecodeError as e: + logger.warning(f"JSON load failed: {v}, error:{e}") + + return q if __name__ == "__main__": diff --git a/metagpt/software_company.py b/metagpt/software_company.py index b2bd18c58..4bedec0e1 100644 --- a/metagpt/software_company.py +++ b/metagpt/software_company.py @@ -4,6 +4,9 @@ @Time : 2023/5/12 00:30 @Author : alexanderwu @File : software_company.py +@Modified By: mashenquan, 2023-11-1. Optimization: + 1. Standardize the design of message filtering-related features. + 2. Abandon the design of having `Environment` store all messages. """ from pydantic import BaseModel, Field @@ -14,13 +17,15 @@ from metagpt.logs import logger from metagpt.roles import Role from metagpt.schema import Message from metagpt.utils.common import NoMoneyException +from metagpt.utils.named import Named -class SoftwareCompany(BaseModel): +class SoftwareCompany(BaseModel, Named): """ Software Company: Possesses a team, SOP (Standard Operating Procedures), and a platform for instant messaging, dedicated to writing executable code. """ + environment: Environment = Field(default_factory=Environment) investment: float = Field(default=10.0) idea: str = Field(default="") @@ -36,16 +41,23 @@ class SoftwareCompany(BaseModel): """Invest company. raise NoMoneyException when exceed max_budget.""" self.investment = investment CONFIG.max_budget = investment - logger.info(f'Investment: ${investment}.') + logger.info(f"Investment: ${investment}.") def _check_balance(self): if CONFIG.total_cost > CONFIG.max_budget: - raise NoMoneyException(CONFIG.total_cost, f'Insufficient funds: {CONFIG.max_budget}') + raise NoMoneyException(CONFIG.total_cost, f"Insufficient funds: {CONFIG.max_budget}") def start_project(self, idea): """Start a project from publishing boss requirement.""" self.idea = idea - self.environment.publish_message(Message(role="BOSS", content=idea, cause_by=BossRequirement)) + self.environment.publish_message( + Message( + role="BOSS", + content=idea, + cause_by=BossRequirement.get_class_name(), + tx_from=SoftwareCompany.get_class_name(), + ) + ) def _save(self): logger.info(self.json()) @@ -58,5 +70,3 @@ class SoftwareCompany(BaseModel): logger.debug(f"{n_round=}") self._check_balance() await self.environment.run() - return self.environment.history - \ No newline at end of file diff --git a/metagpt/utils/common.py b/metagpt/utils/common.py index f09666beb..df4688378 100644 --- a/metagpt/utils/common.py +++ b/metagpt/utils/common.py @@ -85,10 +85,7 @@ class OutputParser: @staticmethod def parse_python_code(text: str) -> str: - for pattern in ( - r"(.*?```python.*?\s+)?(?P.*)(```.*?)", - r"(.*?```python.*?\s+)?(?P.*)", - ): + for pattern in (r"(.*?```python.*?\s+)?(?P.*)(```.*?)", r"(.*?```python.*?\s+)?(?P.*)"): match = re.search(pattern, text, re.DOTALL) if not match: continue @@ -305,3 +302,14 @@ def parse_recipient(text): pattern = r"## Send To:\s*([A-Za-z]+)\s*?" # hard code for now recipient = re.search(pattern, text) return recipient.group(1) if recipient else "" + + +def get_class_name(cls) -> str: + """Return class name""" + return f"{cls.__module__}.{cls.__name__}" + + +def get_object_name(obj) -> str: + """Return class name of the object""" + cls = type(obj) + return f"{cls.__module__}.{cls.__name__}" diff --git a/metagpt/utils/named.py b/metagpt/utils/named.py new file mode 100644 index 000000000..e4da574e8 --- /dev/null +++ b/metagpt/utils/named.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +@Time : 2023/11/1 +@Author : mashenquan +@File : named.py +""" + + +class Named: + """A base class with functions for converting classes to names and objects to class names.""" + + @classmethod + def get_class_name(cls): + """Return class name""" + return f"{cls.__module__}.{cls.__name__}" + + def get_object_name(self): + """Return class name of the object""" + cls = type(self) + return f"{cls.__module__}.{cls.__name__}" diff --git a/tests/metagpt/actions/test_write_prd.py b/tests/metagpt/actions/test_write_prd.py index 38e4e5221..40ab20dad 100644 --- a/tests/metagpt/actions/test_write_prd.py +++ b/tests/metagpt/actions/test_write_prd.py @@ -4,6 +4,7 @@ @Time : 2023/5/11 17:45 @Author : alexanderwu @File : test_write_prd.py +@Modified By: mashenquan, 2023-11-1. Standardize the usage of message filtering-related features. """ import pytest @@ -17,7 +18,7 @@ from metagpt.schema import Message async def test_write_prd(): product_manager = ProductManager() requirements = "开发一个基于大语言模型与私有知识库的搜索引擎,希望可以基于大语言模型进行搜索总结" - prd = await product_manager.handle(Message(content=requirements, cause_by=BossRequirement)) + prd = await product_manager.handle(Message(content=requirements, cause_by=BossRequirement.get_class_name())) logger.info(requirements) logger.info(prd) diff --git a/tests/metagpt/memory/test_longterm_memory.py b/tests/metagpt/memory/test_longterm_memory.py index dc5540520..c40d7ab9d 100644 --- a/tests/metagpt/memory/test_longterm_memory.py +++ b/tests/metagpt/memory/test_longterm_memory.py @@ -1,12 +1,15 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# @Desc : unittest of `metagpt/memory/longterm_memory.py` +""" +@Desc : unittest of `metagpt/memory/longterm_memory.py` +@Modified By: mashenquan, 2023-11-1. Standardize the usage of message filtering-related features. +""" -from metagpt.config import CONFIG -from metagpt.schema import Message from metagpt.actions import BossRequirement -from metagpt.roles.role import RoleContext +from metagpt.config import CONFIG from metagpt.memory import LongTermMemory +from metagpt.roles.role import RoleContext +from metagpt.schema import Message def test_ltm_search(): @@ -14,25 +17,25 @@ def test_ltm_search(): openai_api_key = CONFIG.openai_api_key assert len(openai_api_key) > 20 - role_id = 'UTUserLtm(Product Manager)' - rc = RoleContext(watch=[BossRequirement]) + role_id = "UTUserLtm(Product Manager)" + rc = RoleContext(watch=[BossRequirement.get_class_name()]) ltm = LongTermMemory() ltm.recover_memory(role_id, rc) - idea = 'Write a cli snake game' - message = Message(role='BOSS', content=idea, cause_by=BossRequirement) + idea = "Write a cli snake game" + message = Message(role="BOSS", content=idea, cause_by=BossRequirement.get_class_name()) news = ltm.find_news([message]) assert len(news) == 1 ltm.add(message) - sim_idea = 'Write a game of cli snake' - sim_message = Message(role='BOSS', content=sim_idea, cause_by=BossRequirement) + sim_idea = "Write a game of cli snake" + sim_message = Message(role="BOSS", content=sim_idea, cause_by=BossRequirement.get_class_name()) news = ltm.find_news([sim_message]) assert len(news) == 0 ltm.add(sim_message) - new_idea = 'Write a 2048 web game' - new_message = Message(role='BOSS', content=new_idea, cause_by=BossRequirement) + new_idea = "Write a 2048 web game" + new_message = Message(role="BOSS", content=new_idea, cause_by=BossRequirement.get_class_name()) news = ltm.find_news([new_message]) assert len(news) == 1 ltm.add(new_message) @@ -47,8 +50,8 @@ def test_ltm_search(): news = ltm_new.find_news([sim_message]) assert len(news) == 0 - new_idea = 'Write a Battle City' - new_message = Message(role='BOSS', content=new_idea, cause_by=BossRequirement) + new_idea = "Write a Battle City" + new_message = Message(role="BOSS", content=new_idea, cause_by=BossRequirement.get_class_name()) news = ltm_new.find_news([new_message]) assert len(news) == 1 diff --git a/tests/metagpt/memory/test_memory_storage.py b/tests/metagpt/memory/test_memory_storage.py index 6bb3e8f1d..881b47d6f 100644 --- a/tests/metagpt/memory/test_memory_storage.py +++ b/tests/metagpt/memory/test_memory_storage.py @@ -1,20 +1,23 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# @Desc : the unittests of metagpt/memory/memory_storage.py +""" +@Desc : the unittests of metagpt/memory/memory_storage.py +@Modified By: mashenquan, 2023-11-1. Standardize the usage of message filtering-related features. +""" + from typing import List +from metagpt.actions import BossRequirement, WritePRD +from metagpt.actions.action_output import ActionOutput from metagpt.memory.memory_storage import MemoryStorage from metagpt.schema import Message -from metagpt.actions import BossRequirement -from metagpt.actions import WritePRD -from metagpt.actions.action_output import ActionOutput def test_idea_message(): - idea = 'Write a cli snake game' - role_id = 'UTUser1(Product Manager)' - message = Message(role='BOSS', content=idea, cause_by=BossRequirement) + idea = "Write a cli snake game" + role_id = "UTUser1(Product Manager)" + message = Message(role="BOSS", content=idea, cause_by=BossRequirement.get_class_name()) memory_storage: MemoryStorage = MemoryStorage() messages = memory_storage.recover_memory(role_id) @@ -23,13 +26,13 @@ def test_idea_message(): memory_storage.add(message) assert memory_storage.is_initialized is True - sim_idea = 'Write a game of cli snake' - sim_message = Message(role='BOSS', content=sim_idea, cause_by=BossRequirement) + sim_idea = "Write a game of cli snake" + sim_message = Message(role="BOSS", content=sim_idea, cause_by=BossRequirement.get_class_name()) new_messages = memory_storage.search(sim_message) - assert len(new_messages) == 0 # similar, return [] + assert len(new_messages) == 0 # similar, return [] - new_idea = 'Write a 2048 web game' - new_message = Message(role='BOSS', content=new_idea, cause_by=BossRequirement) + new_idea = "Write a 2048 web game" + new_message = Message(role="BOSS", content=new_idea, cause_by=BossRequirement.get_class_name()) new_messages = memory_storage.search(new_message) assert new_messages[0].content == message.content @@ -38,22 +41,15 @@ def test_idea_message(): def test_actionout_message(): - out_mapping = { - 'field1': (str, ...), - 'field2': (List[str], ...) - } - out_data = { - 'field1': 'field1 value', - 'field2': ['field2 value1', 'field2 value2'] - } - ic_obj = ActionOutput.create_model_class('prd', out_mapping) + out_mapping = {"field1": (str, ...), "field2": (List[str], ...)} + out_data = {"field1": "field1 value", "field2": ["field2 value1", "field2 value2"]} + ic_obj = ActionOutput.create_model_class("prd", out_mapping) - role_id = 'UTUser2(Architect)' - content = 'The boss has requested the creation of a command-line interface (CLI) snake game' - message = Message(content=content, - instruct_content=ic_obj(**out_data), - role='user', - cause_by=WritePRD) # WritePRD as test action + role_id = "UTUser2(Architect)" + content = "The boss has requested the creation of a command-line interface (CLI) snake game" + message = Message( + content=content, instruct_content=ic_obj(**out_data), role="user", cause_by=WritePRD.get_class_name() + ) # WritePRD as test action memory_storage: MemoryStorage = MemoryStorage() messages = memory_storage.recover_memory(role_id) @@ -62,19 +58,17 @@ def test_actionout_message(): memory_storage.add(message) assert memory_storage.is_initialized is True - sim_conent = 'The request is command-line interface (CLI) snake game' - sim_message = Message(content=sim_conent, - instruct_content=ic_obj(**out_data), - role='user', - cause_by=WritePRD) + sim_conent = "The request is command-line interface (CLI) snake game" + sim_message = Message( + content=sim_conent, instruct_content=ic_obj(**out_data), role="user", cause_by=WritePRD.get_class_name() + ) new_messages = memory_storage.search(sim_message) - assert len(new_messages) == 0 # similar, return [] + assert len(new_messages) == 0 # similar, return [] - new_conent = 'Incorporate basic features of a snake game such as scoring and increasing difficulty' - new_message = Message(content=new_conent, - instruct_content=ic_obj(**out_data), - role='user', - cause_by=WritePRD) + new_conent = "Incorporate basic features of a snake game such as scoring and increasing difficulty" + new_message = Message( + content=new_conent, instruct_content=ic_obj(**out_data), role="user", cause_by=WritePRD.get_class_name() + ) new_messages = memory_storage.search(new_message) assert new_messages[0].content == message.content diff --git a/tests/metagpt/planner/test_action_planner.py b/tests/metagpt/planner/test_action_planner.py index 5ab9a493f..a3831c08d 100644 --- a/tests/metagpt/planner/test_action_planner.py +++ b/tests/metagpt/planner/test_action_planner.py @@ -4,6 +4,9 @@ @Time : 2023/9/16 20:03 @Author : femto Zheng @File : test_basic_planner.py +@Modified By: mashenquan, 2023-11-1. Optimization: + 1. Standardize the usage of message filtering-related features. + 2. Standardize the usage of message transmission. """ import pytest from semantic_kernel.core_skills import FileIOSkill, MathSkill, TextSkill, TimeSkill @@ -23,7 +26,7 @@ async def test_action_planner(): role.import_skill(TimeSkill(), "time") role.import_skill(TextSkill(), "text") task = "What is the sum of 110 and 990?" - role.recv(Message(content=task, cause_by=BossRequirement)) - + role.async_put_message(Message(content=task, cause_by=BossRequirement.get_class_name())) + await role._observe() await role._think() # it will choose mathskill.Add assert "1100" == (await role._act()).content diff --git a/tests/metagpt/planner/test_basic_planner.py b/tests/metagpt/planner/test_basic_planner.py index 03a82ec5e..9efcb9367 100644 --- a/tests/metagpt/planner/test_basic_planner.py +++ b/tests/metagpt/planner/test_basic_planner.py @@ -4,6 +4,9 @@ @Time : 2023/9/16 20:03 @Author : femto Zheng @File : test_basic_planner.py +@Modified By: mashenquan, 2023-11-1. Optimization: + 1. Standardize the usage of message filtering-related features. + 2. Standardize the usage of message transmission. """ import pytest from semantic_kernel.core_skills import TextSkill @@ -26,7 +29,8 @@ async def test_basic_planner(): role.import_semantic_skill_from_directory(SKILL_DIRECTORY, "WriterSkill") role.import_skill(TextSkill(), "TextSkill") # using BasicPlanner - role.recv(Message(content=task, cause_by=BossRequirement)) + role.async_put_message(Message(content=task, cause_by=BossRequirement.get_class_name())) + await role._observe() await role._think() # assuming sk_agent will think he needs WriterSkill.Brainstorm and WriterSkill.Translate assert "WriterSkill.Brainstorm" in role.plan.generated_plan.result diff --git a/tests/metagpt/roles/mock.py b/tests/metagpt/roles/mock.py index 52fc4a3c1..b9891cd81 100644 --- a/tests/metagpt/roles/mock.py +++ b/tests/metagpt/roles/mock.py @@ -4,6 +4,7 @@ @Time : 2023/5/12 13:05 @Author : alexanderwu @File : mock.py +@Modified By: mashenquan, 2023-11-1. Standardize the usage of message filtering-related features. """ from metagpt.actions import BossRequirement, WriteDesign, WritePRD, WriteTasks from metagpt.schema import Message @@ -71,7 +72,7 @@ PRD = '''## 原始需求 ``` ''' -SYSTEM_DESIGN = '''## Python package name +SYSTEM_DESIGN = """## Python package name ```python "smart_search_engine" ``` @@ -149,10 +150,10 @@ sequenceDiagram S-->>SE: return summary SE-->>M: return summary ``` -''' +""" -TASKS = '''## Logic Analysis +TASKS = """## Logic Analysis 在这个项目中,所有的模块都依赖于“SearchEngine”类,这是主入口,其他的模块(Index、Ranking和Summary)都通过它交互。另外,"Index"类又依赖于"KnowledgeBase"类,因为它需要从知识库中获取数据。 @@ -181,7 +182,7 @@ task_list = [ ] ``` 这个任务列表首先定义了最基础的模块,然后是依赖这些模块的模块,最后是辅助模块。可以根据团队的能力和资源,同时开发多个任务,只要满足依赖关系。例如,在开发"search.py"之前,可以同时开发"knowledge_base.py"、"index.py"、"ranking.py"和"summary.py"。 -''' +""" TASKS_TOMATO_CLOCK = '''## Required Python third-party packages: Provided in requirements.txt format @@ -224,35 +225,35 @@ task_list = [ TASK = """smart_search_engine/knowledge_base.py""" STRS_FOR_PARSING = [ -""" + """ ## 1 ```python a ``` """, -""" + """ ##2 ```python "a" ``` """, -""" + """ ## 3 ```python a = "a" ``` """, -""" + """ ## 4 ```python a = 'a' ``` -""" +""", ] class MockMessages: - req = Message(role="Boss", content=BOSS_REQUIREMENT, cause_by=BossRequirement) - prd = Message(role="Product Manager", content=PRD, cause_by=WritePRD) - system_design = Message(role="Architect", content=SYSTEM_DESIGN, cause_by=WriteDesign) - tasks = Message(role="Project Manager", content=TASKS, cause_by=WriteTasks) + req = Message(role="Boss", content=BOSS_REQUIREMENT, cause_by=BossRequirement.get_class_name()) + prd = Message(role="Product Manager", content=PRD, cause_by=WritePRD.get_class_name()) + system_design = Message(role="Architect", content=SYSTEM_DESIGN, cause_by=WriteDesign.get_class_name()) + tasks = Message(role="Project Manager", content=TASKS, cause_by=WriteTasks.get_class_name()) diff --git a/tests/metagpt/roles/test_architect.py b/tests/metagpt/roles/test_architect.py index d44e0d923..910c589ca 100644 --- a/tests/metagpt/roles/test_architect.py +++ b/tests/metagpt/roles/test_architect.py @@ -4,6 +4,7 @@ @Time : 2023/5/20 14:37 @Author : alexanderwu @File : test_architect.py +@Modified By: mashenquan, 2023-11-1. Standardize the usage of message transmission. """ import pytest @@ -15,7 +16,7 @@ from tests.metagpt.roles.mock import MockMessages @pytest.mark.asyncio async def test_architect(): role = Architect() - role.recv(MockMessages.req) - rsp = await role.handle(MockMessages.prd) + role.async_put_message(MockMessages.req) + rsp = await role.run(MockMessages.prd) logger.info(rsp) assert len(rsp.content) > 0 diff --git a/tests/metagpt/roles/test_engineer.py b/tests/metagpt/roles/test_engineer.py index c0c48d0b1..e80234b3b 100644 --- a/tests/metagpt/roles/test_engineer.py +++ b/tests/metagpt/roles/test_engineer.py @@ -4,6 +4,7 @@ @Time : 2023/5/12 10:14 @Author : alexanderwu @File : test_engineer.py +@Modified By: mashenquan, 2023-11-1. Standardize the usage of message transmission. """ import pytest @@ -22,10 +23,10 @@ from tests.metagpt.roles.mock import ( async def test_engineer(): engineer = Engineer() - engineer.recv(MockMessages.req) - engineer.recv(MockMessages.prd) - engineer.recv(MockMessages.system_design) - rsp = await engineer.handle(MockMessages.tasks) + engineer.async_put_message(MockMessages.req) + engineer.async_put_message(MockMessages.prd) + engineer.async_put_message(MockMessages.system_design) + rsp = await engineer.run(MockMessages.tasks) logger.info(rsp) assert "all done." == rsp.content @@ -35,13 +36,13 @@ def test_parse_str(): for idx, i in enumerate(STRS_FOR_PARSING): text = CodeParser.parse_str(f"{idx+1}", i) # logger.info(text) - assert text == 'a' + assert text == "a" def test_parse_blocks(): tasks = CodeParser.parse_blocks(TASKS) logger.info(tasks.keys()) - assert 'Task list' in tasks.keys() + assert "Task list" in tasks.keys() target_list = [ diff --git a/tests/metagpt/test_environment.py b/tests/metagpt/test_environment.py index a0f1f6257..755798b17 100644 --- a/tests/metagpt/test_environment.py +++ b/tests/metagpt/test_environment.py @@ -4,6 +4,7 @@ @Time : 2023/5/12 00:47 @Author : alexanderwu @File : test_environment.py +@Modified By: mashenquan, 2023-11-1. Standardize the usage of message transmission. """ import pytest @@ -49,7 +50,7 @@ async def test_publish_and_process_message(env: Environment): env.add_roles([product_manager, architect]) env.set_manager(Manager()) - env.publish_message(Message(role="BOSS", content="需要一个基于LLM做总结的搜索引擎", cause_by=BossRequirement)) + env.publish_message(Message(role="BOSS", content="需要一个基于LLM做总结的搜索引擎", cause_by=BossRequirement.get_class_name())) await env.run(k=2) logger.info(f"{env.history=}") diff --git a/tests/metagpt/utils/test_serialize.py b/tests/metagpt/utils/test_serialize.py index 69f317f79..5a0840c87 100644 --- a/tests/metagpt/utils/test_serialize.py +++ b/tests/metagpt/utils/test_serialize.py @@ -1,6 +1,9 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# @Desc : the unittest of serialize +""" +@Desc : the unittest of serialize +@Modified By: mashenquan, 2023-11-1. Standardize the usage of message filtering-related features. +""" from typing import List, Tuple @@ -55,7 +58,7 @@ def test_serialize_and_deserialize_message(): ic_obj = ActionOutput.create_model_class("prd", out_mapping) message = Message( - content="prd demand", instruct_content=ic_obj(**out_data), role="user", cause_by=WritePRD + content="prd demand", instruct_content=ic_obj(**out_data), role="user", cause_by=WritePRD.get_class_name() ) # WritePRD as test action message_ser = serialize_message(message)