diff --git a/metagpt/environment/mgx/mgx_env.py b/metagpt/environment/mgx/mgx_env.py index 6c7069b56..c4ed8c4f1 100644 --- a/metagpt/environment/mgx/mgx_env.py +++ b/metagpt/environment/mgx/mgx_env.py @@ -6,6 +6,7 @@ from metagpt.actions import ( WriteTest, ) from metagpt.actions.summarize_code import SummarizeCode +from metagpt.const import AGENT from metagpt.environment.base_env import Environment from metagpt.logs import get_human_input from metagpt.roles import ( @@ -23,6 +24,9 @@ from metagpt.utils.common import any_to_str, any_to_str_set class MGXEnv(Environment): """MGX Environment""" + # Before enabling TL to fully take over the routing, all software company roles need to be able to handle TL messages, which requires restructuring. + allow_bypass_team_leader: bool = True + def _publish_message(self, message: Message, peekable: bool = True) -> bool: return super().publish_message(message, peekable) @@ -35,7 +39,11 @@ class MGXEnv(Environment): # bypass team leader, team leader only needs to know but not to react tl.rc.memory.add(self.move_message_info_to_content(message)) - elif self.message_within_software_sop(message) and not self.has_user_requirement(): + elif ( + self.allow_bypass_team_leader + and self.message_within_software_sop(message) + and not self.has_user_requirement() + ): # Quick routing for messages within software SOP, bypassing TL. # Use rules to check for user intervention and to finish task. # NOTE: This escapes TL's supervision and has pitfalls such as routing obsolete messages even if TL has acquired a new user requirement. @@ -47,6 +55,9 @@ class MGXEnv(Environment): tl.finish_current_task() elif publicer == tl.profile: + if message.send_to == {"no one"}: + # skip the dummy message from team leader + return True # message processed by team leader can be published now self._publish_message(message) @@ -71,7 +82,7 @@ class MGXEnv(Environment): def message_within_software_sop(self, message: Message) -> bool: return message.sent_from in any_to_str_set([ProductManager, Architect, ProjectManager, Engineer, QaEngineer]) - def has_user_requirement(self, k=2) -> bool: + def has_user_requirement(self, k=1) -> bool: """A heuristics to check if there is a recent user intervention""" return any_to_str(UserRequirement) in [msg.cause_by for msg in self.history.get(k)] @@ -86,10 +97,8 @@ class MGXEnv(Environment): 1. Convert role, since role field must be reserved for LLM API, and is limited to, for example, one of ["user", "assistant", "system"] 2. Add sender and recipient info to content, making TL aware, since LLM API only takes content as input """ - if message.role in ["system", "user", "assistant"]: - sent_from = message.sent_from - else: - sent_from = message.role + if message.role not in ["system", "user", "assistant"]: message.role = "assistant" + sent_from = message.metadata[AGENT] if AGENT in message.metadata else message.sent_from message.content = f"from {sent_from} to {message.send_to}: {message.content}" return message diff --git a/metagpt/roles/di/data_analyst.py b/metagpt/roles/di/data_analyst.py index 84f72664b..4c959575b 100644 --- a/metagpt/roles/di/data_analyst.py +++ b/metagpt/roles/di/data_analyst.py @@ -83,7 +83,7 @@ class DataAnalyst(DataInterpreter): self.commands = json.loads(CodeParser.parse_code(block=None, text=rsp)) self.rc.memory.add(Message(content=rsp, role="assistant")) - await run_commands(self, self.commands) + await run_commands(self, self.commands, self.rc.working_memory) return bool(self.rc.todo) diff --git a/metagpt/roles/di/team_leader.py b/metagpt/roles/di/team_leader.py index 1bffc90bc..a1ef11fa6 100644 --- a/metagpt/roles/di/team_leader.py +++ b/metagpt/roles/di/team_leader.py @@ -40,9 +40,6 @@ class TeamLeader(Role): @model_validator(mode="after") def set_plan(self) -> "TeamLeader": - self.rc.working_memory = ( - self.rc.memory - ) # TeamLeader does not need working memory, all messages should go into memory self.planner = Planner(goal=self.goal, working_memory=self.rc.working_memory, auto_run=True) return self @@ -80,7 +77,7 @@ class TeamLeader(Role): async def _act(self) -> Message: """Useful in 'react' mode. Return a Message conforming to Role._act interface.""" - await run_commands(self, self.commands) + await run_commands(self, self.commands, self.rc.memory) self.task_result = TaskResult(result="Success", is_success=True) msg = Message(content="Commands executed", send_to="no one") # a dummy message to conform to the interface self.rc.memory.add(msg) diff --git a/metagpt/strategy/thinking_command.py b/metagpt/strategy/thinking_command.py index 52379d4ec..f852c3764 100644 --- a/metagpt/strategy/thinking_command.py +++ b/metagpt/strategy/thinking_command.py @@ -1,8 +1,11 @@ +from __future__ import annotations + from enum import Enum from pydantic import BaseModel from metagpt.environment.mgx.mgx_env import MGXEnv +from metagpt.memory import Memory from metagpt.roles import Role from metagpt.schema import Message, Task @@ -72,11 +75,12 @@ def prepare_command_prompt(commands: list[Command]) -> str: return command_prompt -async def run_env_command(role: Role, cmd): +async def run_env_command(role: Role, cmd: list[dict], role_memory: Memory = None): assert isinstance(role.rc.env, MGXEnv), "TeamLeader should only be used in an MGXEnv" if cmd["command_name"] == Command.PUBLISH_MESSAGE.cmd_name: role.publish_message(Message(**cmd["args"])) if cmd["command_name"] == Command.ASK_HUMAN.cmd_name: + # TODO: Operation on role memory should not appear here, consider moving it into role role.rc.working_memory.add(Message(content=cmd["args"]["question"], role="assistant")) human_rsp = await role.rc.env.ask_human(sent_from=role, **cmd["args"]) role.rc.working_memory.add(Message(content=human_rsp, role="user")) @@ -85,7 +89,7 @@ async def run_env_command(role: Role, cmd): await role.rc.env.reply_to_human(sent_from=role, **cmd["args"]) -def run_plan_command(role: Role, cmd): +def run_plan_command(role: Role, cmd: list[dict]): if cmd["command_name"] == Command.APPEND_TASK.cmd_name: role.planner.plan.append_task(Task(**cmd["args"])) elif cmd["command_name"] == Command.RESET_TASK.cmd_name: @@ -93,15 +97,17 @@ def run_plan_command(role: Role, cmd): elif cmd["command_name"] == Command.REPLACE_TASK.cmd_name: role.planner.plan.replace_task(Task(**cmd["args"])) elif cmd["command_name"] == Command.FINISH_CURRENT_TASK.cmd_name: + if role.planner.plan.is_plan_finished(): + return role.planner.plan.current_task.update_task_result(task_result=role.task_result) role.planner.plan.finish_current_task() role.rc.working_memory.clear() -async def run_commands(role: Role, cmds): +async def run_commands(role: Role, cmds: list[dict], role_memory: Memory = None): print(*cmds, sep="\n") for cmd in cmds: - await run_env_command(role, cmd) + await run_env_command(role, cmd, role_memory) run_plan_command(role, cmd) if role.planner.plan.is_plan_finished():