From 3590cd77b6a234e39a0d77ff2bfc6e86b3891006 Mon Sep 17 00:00:00 2001 From: yzlin Date: Fri, 26 Apr 2024 00:45:32 +0800 Subject: [PATCH] add quick routing rule --- metagpt/environment/base_env.py | 5 ++-- metagpt/environment/mgx/mgx_env.py | 42 ++++++++++++++++++++++++++++++ metagpt/prompts/di/team_leader.py | 14 ++++++++-- metagpt/roles/di/team_leader.py | 15 +++++++++-- 4 files changed, 70 insertions(+), 6 deletions(-) diff --git a/metagpt/environment/base_env.py b/metagpt/environment/base_env.py index 024c46877..f6d2e431d 100644 --- a/metagpt/environment/base_env.py +++ b/metagpt/environment/base_env.py @@ -19,6 +19,7 @@ from metagpt.environment.api.env_api import ( ) from metagpt.environment.base_env_space import BaseEnvAction, BaseEnvObsParams from metagpt.logs import logger +from metagpt.memory import Memory from metagpt.schema import Message from metagpt.utils.common import get_function_schema, is_coroutine_func, is_send_to @@ -131,7 +132,7 @@ class Environment(ExtEnv): desc: str = Field(default="") # 环境描述 roles: dict[str, SerializeAsAny["Role"]] = Field(default_factory=dict, validate_default=True) member_addrs: Dict["Role", Set] = Field(default_factory=dict, exclude=True) - history: str = "" # For debug + history: Memory = Field(default_factory=Memory) # For debug context: Context = Field(default_factory=Context, exclude=True) def reset( @@ -190,7 +191,7 @@ class Environment(ExtEnv): found = True if not found: logger.warning(f"Message no recipients: {message.dump()}") - self.history += f"\n{message}" # For debug + self.history.add(message) # For debug return True diff --git a/metagpt/environment/mgx/mgx_env.py b/metagpt/environment/mgx/mgx_env.py index a6e15ffa5..17b479679 100644 --- a/metagpt/environment/mgx/mgx_env.py +++ b/metagpt/environment/mgx/mgx_env.py @@ -1,6 +1,22 @@ +from metagpt.actions import ( + UserRequirement, + WriteDesign, + WritePRD, + WriteTasks, + WriteTest, +) +from metagpt.actions.summarize_code import SummarizeCode from metagpt.environment.base_env import Environment from metagpt.logs import get_human_input +from metagpt.roles import ( + Architect, + Engineer, + ProductManager, + ProjectManager, + QaEngineer, +) from metagpt.schema import Message +from metagpt.utils.common import any_to_str, any_to_str_set class MGXEnv(Environment): @@ -18,6 +34,17 @@ class MGXEnv(Environment): # bypass team leader, team leader only needs to know but not to react tl.rc.memory.add(message) + elif self.message_within_software_sop(message) and not self.has_user_requirement(): + # Quick routing for messages within software SOP, bypassing TL. + # Use rules to check for user intervention and to finish task. + # NOTE: This escapes TL's supervision and has pitfalls such as routing obsolete messages when TL has acquired a new user requirement. + # In addition, we should not determine the status of a task based on message cause_by. + # Consider replacing this in the future. + self._publish_message(message) + if self.is_software_task_finished(message): + tl.rc.memory.add(message) + tl.finish_current_task() + elif publicer == tl.profile: # message processed by team leader can be published now self._publish_message(message) @@ -27,6 +54,8 @@ class MGXEnv(Environment): message.send_to.add(tl.name) tl.put_message(message) + self.history.add(message) + return True async def ask_human(self, question: str) -> str: @@ -36,3 +65,16 @@ class MGXEnv(Environment): async def reply_to_human(self, content: str) -> str: # NOTE: Can be overwritten in remote setting return content + + def message_within_software_sop(self, message: Message) -> bool: + return message.sent_from in any_to_str_set([ProductManager, Architect, ProjectManager, Engineer, QaEngineer]) + + def has_user_requirement(self, k=3) -> bool: + """A heuristics to check if there is a recent user intervention""" + return any_to_str(UserRequirement) in [msg.cause_by for msg in self.history.get(k)] + + def is_software_task_finished(self, message: Message) -> bool: + """Use a hard-coded rule to check if one software task is finished""" + return message.cause_by in any_to_str_set([WritePRD, WriteDesign, WriteTasks, SummarizeCode]) or ( + message.cause_by == any_to_str(WriteTest) and "Exceeding" in message.content + ) diff --git a/metagpt/prompts/di/team_leader.py b/metagpt/prompts/di/team_leader.py index 21ec86f5e..87e8a1336 100644 --- a/metagpt/prompts/di/team_leader.py +++ b/metagpt/prompts/di/team_leader.py @@ -46,8 +46,8 @@ Pay close attention to the Example provided You may use any of the available commands to create a plan or update the plan. You may output mutiple commands, they will be executed sequentially. If you finish current task, you will automatically take the next task in the existing plan, use finish_task, DON'T append a new task. -# Your commands in a json array, in the following output format: -Some text indicating your thoughts, including how you categorize the requirement based on Note (is it 1., 2., or 3.?) or how you should update the plan status. Then a json array of commands. +# Your commands in a json array, in the following output format, always output a json array, if there is nothing to do, use the pass command: +Some text indicating your thoughts, such as how you categorize the requirement based on Note (is it 1., 2., or 3.?) or how you should update the plan status. Then a json array of commands. ```json [ {{ @@ -58,3 +58,13 @@ Some text indicating your thoughts, including how you categorize the requirement ] ``` """ + +FINISH_CURRENT_TASK_CMD = """ +```json +[ + { + "command_name": "finish_current_task", + "args": {{}} + } +``` +""" diff --git a/metagpt/roles/di/team_leader.py b/metagpt/roles/di/team_leader.py index 48302d00b..0034a9eef 100644 --- a/metagpt/roles/di/team_leader.py +++ b/metagpt/roles/di/team_leader.py @@ -4,8 +4,13 @@ import json from pydantic import model_validator +from metagpt.actions.di.run_command import RunCommand from metagpt.environment.mgx.mgx_env import MGXEnv -from metagpt.prompts.di.team_leader import CMD_PROMPT, prepare_command_prompt +from metagpt.prompts.di.team_leader import ( + CMD_PROMPT, + FINISH_CURRENT_TASK_CMD, + prepare_command_prompt, +) from metagpt.roles import Role from metagpt.schema import Message, Task, TaskResult from metagpt.strategy.experience_retriever import SimpleExpRetriever @@ -37,7 +42,7 @@ class TeamLeader(Role): def _run_env_command(self, cmd): assert isinstance(self.rc.env, MGXEnv), "TeamLeader should only be used in an MGXEnv" if cmd["command_name"] == Command.PUBLISH_MESSAGE.cmd_name: - self.publish_message(Message(sent_from=self.profile, **cmd["args"])) + self.publish_message(Message(**cmd["args"])) elif cmd["command_name"] == Command.ASK_HUMAN.cmd_name: self.rc.env.ask_human(**cmd["args"]) elif cmd["command_name"] == Command.REPLY_TO_HUMAN.cmd_name: @@ -121,4 +126,10 @@ class TeamLeader(Role): if not self.rc.env: # If env does not exist, do not publish the message return + msg.sent_from = self.profile + msg.cause_by = RunCommand self.rc.env.publish_message(msg, publicer=self.profile) + + def finish_current_task(self): + self.planner.plan.finish_current_task() + self.rc.memory.add(Message(content=FINISH_CURRENT_TASK_CMD, role="assistant"))