diff --git a/metagpt/environment/mgx/mgx_env.py b/metagpt/environment/mgx/mgx_env.py index 53690f7d7..a8fc0df9f 100644 --- a/metagpt/environment/mgx/mgx_env.py +++ b/metagpt/environment/mgx/mgx_env.py @@ -1,30 +1,24 @@ from __future__ import annotations -from metagpt.actions import ( - UserRequirement, - WriteDesign, - WritePRD, - WriteTasks, - WriteTest, -) -from metagpt.actions.summarize_code import SummarizeCode -from metagpt.const import AGENT, IMAGES, TEAMLEADER_NAME +from metagpt.const import AGENT, IMAGES, MESSAGE_ROUTE_TO_ALL, TEAMLEADER_NAME from metagpt.environment.base_env import Environment from metagpt.logs import get_human_input -from metagpt.roles import Architect, ProductManager, ProjectManager, Role +from metagpt.roles import Role from metagpt.schema import Message, SerializationMixin -from metagpt.utils.common import any_to_str, any_to_str_set, extract_and_encode_images +from metagpt.utils.common import extract_and_encode_images class MGXEnv(Environment, SerializationMixin): """MGX Environment""" - # If True, fixed software sop bypassing TL is allowed, otherwise, TL will fully take over the routing - allow_bypass_team_leader: bool = False - direct_chat_roles: set[str] = set() # record direct chat: @role_name + is_public_chat: bool = True + def _publish_message(self, message: Message, peekable: bool = True) -> bool: + if self.is_public_chat: + message.send_to.add(MESSAGE_ROUTE_TO_ALL) + message = self.move_message_info_to_content(message) return super().publish_message(message, peekable) def publish_message(self, message: Message, user_defined_recipient: str = "", publicer: str = "") -> bool: @@ -46,31 +40,10 @@ class MGXEnv(Environment, SerializationMixin): # tl.rc.memory.add(self.move_message_info_to_content(message)) elif message.sent_from in self.direct_chat_roles: - # direct chat response from a certain role to human user, team leader and other roles in the env should not be involved, no need to publish + # if chat is not public, direct chat response from a certain role to human user, team leader and other roles in the env should not be involved, no need to publish self.direct_chat_roles.remove(message.sent_from) - - elif ( - self.allow_bypass_team_leader - and self.message_within_software_sop(message) - and not self.has_user_requirement() - ): - # Quick routing for messages within software SOP, bypassing TL. - # Use rules to check for user intervention and to finish task. - # NOTE: This escapes TL's supervision and has pitfalls such as routing obsolete messages even if TL has acquired a new user requirement. - # In addition, we should not determine the status of a task based on message cause_by. - # Consider replacing this in the future. - self._publish_message(message) - if self.is_software_task_finished(message): - tl.rc.memory.add(self.move_message_info_to_content(message)) - from metagpt.utils.report import CURRENT_ROLE - - role = CURRENT_ROLE.get(None) - if role: - CURRENT_ROLE.set(tl) - tl.finish_current_task() - CURRENT_ROLE.set(role) - else: - tl.finish_current_task() + if self.is_public_chat: + self._publish_message(message) elif publicer == tl.profile: if message.send_to == {"no one"}: @@ -81,9 +54,8 @@ class MGXEnv(Environment, SerializationMixin): else: # every regular message goes through team leader - message = self.move_message_info_to_content(message) message.send_to.add(tl.name) - tl.put_message(message) + self._publish_message(message) self.history.add(message) @@ -98,21 +70,6 @@ class MGXEnv(Environment, SerializationMixin): # NOTE: Can be overwritten in remote setting return "SUCCESS, human has received your reply. Refrain from resending duplicate messages. If you no longer need to take action, use the command ‘end’ to stop." - def message_within_software_sop(self, message: Message) -> bool: - # Engineer, QaEngineer can be end of the SOP. Their msg requires routing outside. - members_concerned = [ProductManager, Architect, ProjectManager] - return message.sent_from in any_to_str_set(members_concerned) - - def has_user_requirement(self, k=1) -> bool: - """A heuristics to check if there is a recent user intervention""" - return any_to_str(UserRequirement) in [msg.cause_by for msg in self.history.get(k)] - - def is_software_task_finished(self, message: Message) -> bool: - """Use a hard-coded rule to check if one software task is finished""" - return message.cause_by in any_to_str_set([WritePRD, WriteDesign, WriteTasks, SummarizeCode]) or ( - message.cause_by == any_to_str(WriteTest) and "Exceeding" in message.content - ) - def move_message_info_to_content(self, message: Message) -> Message: """Two things here: 1. Convert role, since role field must be reserved for LLM API, and is limited to, for example, one of ["user", "assistant", "system"] @@ -122,9 +79,13 @@ class MGXEnv(Environment, SerializationMixin): if converted_msg.role not in ["system", "user", "assistant"]: converted_msg.role = "assistant" sent_from = converted_msg.metadata[AGENT] if AGENT in converted_msg.metadata else converted_msg.sent_from - converted_msg.content = ( - f"[Message] from {sent_from or 'User'} to {converted_msg.send_to}: {converted_msg.content}" - ) + # When displaying send_to, change it to those who need to react and exclude those who only need to be aware, e.g.: + # send_to={} -> Mike; send_to={Alice} -> Alice; send_to={Alice, } -> Alice. + if converted_msg.send_to == {MESSAGE_ROUTE_TO_ALL}: + send_to = TEAMLEADER_NAME + else: + send_to = ", ".join({role for role in converted_msg.send_to if role != MESSAGE_ROUTE_TO_ALL}) + converted_msg.content = f"[Message] from {sent_from or 'User'} to {send_to}: {converted_msg.content}" return converted_msg def attach_images(self, message: Message) -> Message: diff --git a/metagpt/prompts/di/role_zero.py b/metagpt/prompts/di/role_zero.py index af65b7e82..5e3eb0a98 100644 --- a/metagpt/prompts/di/role_zero.py +++ b/metagpt/prompts/di/role_zero.py @@ -231,6 +231,8 @@ QUICK_RESPONSE_SYSTEM_PROMPT = """ {role_info} However, you MUST respond to the user message by yourself directly, DON'T ask your team members. """ +# A tag to indicate message caused by quick think +QUICK_THINK_TAG = "QuickThink" REPORT_TO_HUMAN_PROMPT = """ ## Examlpe diff --git a/metagpt/roles/di/role_zero.py b/metagpt/roles/di/role_zero.py index b66d8bf8c..4614de0f3 100644 --- a/metagpt/roles/di/role_zero.py +++ b/metagpt/roles/di/role_zero.py @@ -29,6 +29,7 @@ from metagpt.prompts.di.role_zero import ( QUICK_THINK_EXAMPLES, QUICK_THINK_PROMPT, QUICK_THINK_SYSTEM_PROMPT, + QUICK_THINK_TAG, REGENERATE_PROMPT, REPORT_TO_HUMAN_PROMPT, ROLE_INSTRUCTION, @@ -93,6 +94,7 @@ class RoleZero(Role): experience_retriever: Annotated[ExpRetriever, Field(exclude=True)] = DummyExpRetriever() # Others + observe_all_msg_from_buffer: bool = True command_rsp: str = "" # the raw string containing the commands commands: list[dict] = [] # commands to be executed memory_k: int = 200 # number of memories (messages) to use as historical context @@ -396,12 +398,12 @@ class RoleZero(Role): answer = await SearchEnhancedQA().run(query) if answer: - self.rc.memory.add(AIMessage(content=answer, cause_by=RunCommand)) + self.rc.memory.add(AIMessage(content=answer, cause_by=QUICK_THINK_TAG)) await self.reply_to_human(content=answer) rsp_msg = AIMessage( - content="Complete run", + content=answer, sent_from=self.name, - cause_by=RunCommand, + cause_by=QUICK_THINK_TAG, ) return rsp_msg, intent_result diff --git a/metagpt/roles/di/team_leader.py b/metagpt/roles/di/team_leader.py index 0724ffdea..7a8b8b5be 100644 --- a/metagpt/roles/di/team_leader.py +++ b/metagpt/roles/di/team_leader.py @@ -6,6 +6,7 @@ from pydantic import Field from metagpt.actions.di.run_command import RunCommand from metagpt.const import TEAMLEADER_NAME +from metagpt.prompts.di.role_zero import QUICK_THINK_TAG from metagpt.prompts.di.team_leader import ( FINISH_CURRENT_TASK_CMD, TL_INFO, @@ -61,13 +62,14 @@ class TeamLeader(RoleZero): return await super()._think() def publish_message(self, msg: Message, send_to="no one"): - """Overwrite Role.publish_message, send to no one if called within Role.run, send to the specified role if called dynamically.""" + """Overwrite Role.publish_message, send to no one if called within Role.run (except for quick think), send to the specified role if called dynamically.""" if not msg: return if not self.rc.env: # If env does not exist, do not publish the message return - msg.send_to = send_to + if msg.cause_by != QUICK_THINK_TAG: + msg.send_to = send_to self.rc.env.publish_message(msg, publicer=self.profile) def publish_team_message(self, content: str, send_to: str): diff --git a/metagpt/roles/product_manager.py b/metagpt/roles/product_manager.py index 93d83e329..f2a47affd 100644 --- a/metagpt/roles/product_manager.py +++ b/metagpt/roles/product_manager.py @@ -42,8 +42,8 @@ class ProductManager(RoleZero): # NOTE: The following init setting will only be effective when self.use_fixed_sop is changed to True self.enable_memory = False self.set_actions([PrepareDocuments(send_to=any_to_str(self)), WritePRD]) - self._watch([UserRequirement, PrepareDocuments]) if self.use_fixed_sop: + self._watch([UserRequirement, PrepareDocuments]) self.rc.react_mode = RoleReactMode.BY_ORDER def _update_tool_execution(self): diff --git a/metagpt/roles/role.py b/metagpt/roles/role.py index 5d1050464..1851dd20f 100644 --- a/metagpt/roles/role.py +++ b/metagpt/roles/role.py @@ -154,6 +154,7 @@ class Role(BaseRole, SerializationMixin, ContextMixin, BaseModel): # builtin variables recovered: bool = False # to tag if a recovered role latest_observed_msg: Optional[Message] = None # record the latest observed message when interrupted + observe_all_msg_from_buffer: bool = False # whether to save all msgs from buffer to memory for role's awareness __hash__ = object.__hash__ # support Role as hashable type in `Environment.members` @@ -171,7 +172,9 @@ class Role(BaseRole, SerializationMixin, ContextMixin, BaseModel): self._check_actions() self.llm.system_prompt = self._get_prefix() self.llm.cost_manager = self.context.cost_manager - self._watch(kwargs.pop("watch", [UserRequirement])) + # if observe_all_msg_from_buffer, we should not use cause_by to select messages but observe all + if not self.observe_all_msg_from_buffer: + self._watch(kwargs.pop("watch", [UserRequirement])) if self.latest_observed_msg: self.recovered = True @@ -396,7 +399,12 @@ class Role(BaseRole, SerializationMixin, ContextMixin, BaseModel): self.rc.news = [ n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages ] - self.rc.memory.add_batch(self.rc.news) # only save messages of interest into memory + if self.observe_all_msg_from_buffer: + # save all new messages from the buffer into memory, the role may not react to them but can be aware of them + self.rc.memory.add_batch(news) + else: + # only save messages of interest into memory + self.rc.memory.add_batch(self.rc.news) self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None # record the latest observed msg # Design Rules: diff --git a/tests/metagpt/environment/mgx_env/run_mgx_env.py b/tests/metagpt/environment/mgx_env/run_mgx_env.py index f0f561774..dd9e7c3e5 100644 --- a/tests/metagpt/environment/mgx_env/run_mgx_env.py +++ b/tests/metagpt/environment/mgx_env/run_mgx_env.py @@ -18,7 +18,7 @@ async def main(requirement="", enable_human_input=False, use_fixed_sop=False, al else: engineer = Engineer2() - env = MGXEnv(allow_bypass_team_leader=use_fixed_sop) + env = MGXEnv() env.add_roles( [ TeamLeader(),