add quick routing rule

This commit is contained in:
yzlin 2024-04-26 00:45:32 +08:00
parent c63a7ecc18
commit 3590cd77b6
4 changed files with 70 additions and 6 deletions

View file

@ -19,6 +19,7 @@ from metagpt.environment.api.env_api import (
)
from metagpt.environment.base_env_space import BaseEnvAction, BaseEnvObsParams
from metagpt.logs import logger
from metagpt.memory import Memory
from metagpt.schema import Message
from metagpt.utils.common import get_function_schema, is_coroutine_func, is_send_to
@ -131,7 +132,7 @@ class Environment(ExtEnv):
desc: str = Field(default="") # 环境描述
roles: dict[str, SerializeAsAny["Role"]] = Field(default_factory=dict, validate_default=True)
member_addrs: Dict["Role", Set] = Field(default_factory=dict, exclude=True)
history: str = "" # For debug
history: Memory = Field(default_factory=Memory) # For debug
context: Context = Field(default_factory=Context, exclude=True)
def reset(
@ -190,7 +191,7 @@ class Environment(ExtEnv):
found = True
if not found:
logger.warning(f"Message no recipients: {message.dump()}")
self.history += f"\n{message}" # For debug
self.history.add(message) # For debug
return True

View file

@ -1,6 +1,22 @@
from metagpt.actions import (
UserRequirement,
WriteDesign,
WritePRD,
WriteTasks,
WriteTest,
)
from metagpt.actions.summarize_code import SummarizeCode
from metagpt.environment.base_env import Environment
from metagpt.logs import get_human_input
from metagpt.roles import (
Architect,
Engineer,
ProductManager,
ProjectManager,
QaEngineer,
)
from metagpt.schema import Message
from metagpt.utils.common import any_to_str, any_to_str_set
class MGXEnv(Environment):
@ -18,6 +34,17 @@ class MGXEnv(Environment):
# bypass team leader, team leader only needs to know but not to react
tl.rc.memory.add(message)
elif self.message_within_software_sop(message) and not self.has_user_requirement():
# Quick routing for messages within software SOP, bypassing TL.
# Use rules to check for user intervention and to finish task.
# NOTE: This escapes TL's supervision and has pitfalls such as routing obsolete messages when TL has acquired a new user requirement.
# In addition, we should not determine the status of a task based on message cause_by.
# Consider replacing this in the future.
self._publish_message(message)
if self.is_software_task_finished(message):
tl.rc.memory.add(message)
tl.finish_current_task()
elif publicer == tl.profile:
# message processed by team leader can be published now
self._publish_message(message)
@ -27,6 +54,8 @@ class MGXEnv(Environment):
message.send_to.add(tl.name)
tl.put_message(message)
self.history.add(message)
return True
async def ask_human(self, question: str) -> str:
@ -36,3 +65,16 @@ class MGXEnv(Environment):
async def reply_to_human(self, content: str) -> str:
# NOTE: Can be overwritten in remote setting
return content
def message_within_software_sop(self, message: Message) -> bool:
return message.sent_from in any_to_str_set([ProductManager, Architect, ProjectManager, Engineer, QaEngineer])
def has_user_requirement(self, k=3) -> bool:
"""A heuristics to check if there is a recent user intervention"""
return any_to_str(UserRequirement) in [msg.cause_by for msg in self.history.get(k)]
def is_software_task_finished(self, message: Message) -> bool:
"""Use a hard-coded rule to check if one software task is finished"""
return message.cause_by in any_to_str_set([WritePRD, WriteDesign, WriteTasks, SummarizeCode]) or (
message.cause_by == any_to_str(WriteTest) and "Exceeding" in message.content
)

View file

@ -46,8 +46,8 @@ Pay close attention to the Example provided
You may use any of the available commands to create a plan or update the plan. You may output mutiple commands, they will be executed sequentially.
If you finish current task, you will automatically take the next task in the existing plan, use finish_task, DON'T append a new task.
# Your commands in a json array, in the following output format:
Some text indicating your thoughts, including how you categorize the requirement based on Note (is it 1., 2., or 3.?) or how you should update the plan status. Then a json array of commands.
# Your commands in a json array, in the following output format, always output a json array, if there is nothing to do, use the pass command:
Some text indicating your thoughts, such as how you categorize the requirement based on Note (is it 1., 2., or 3.?) or how you should update the plan status. Then a json array of commands.
```json
[
{{
@ -58,3 +58,13 @@ Some text indicating your thoughts, including how you categorize the requirement
]
```
"""
FINISH_CURRENT_TASK_CMD = """
```json
[
{
"command_name": "finish_current_task",
"args": {{}}
}
```
"""

View file

@ -4,8 +4,13 @@ import json
from pydantic import model_validator
from metagpt.actions.di.run_command import RunCommand
from metagpt.environment.mgx.mgx_env import MGXEnv
from metagpt.prompts.di.team_leader import CMD_PROMPT, prepare_command_prompt
from metagpt.prompts.di.team_leader import (
CMD_PROMPT,
FINISH_CURRENT_TASK_CMD,
prepare_command_prompt,
)
from metagpt.roles import Role
from metagpt.schema import Message, Task, TaskResult
from metagpt.strategy.experience_retriever import SimpleExpRetriever
@ -37,7 +42,7 @@ class TeamLeader(Role):
def _run_env_command(self, cmd):
assert isinstance(self.rc.env, MGXEnv), "TeamLeader should only be used in an MGXEnv"
if cmd["command_name"] == Command.PUBLISH_MESSAGE.cmd_name:
self.publish_message(Message(sent_from=self.profile, **cmd["args"]))
self.publish_message(Message(**cmd["args"]))
elif cmd["command_name"] == Command.ASK_HUMAN.cmd_name:
self.rc.env.ask_human(**cmd["args"])
elif cmd["command_name"] == Command.REPLY_TO_HUMAN.cmd_name:
@ -121,4 +126,10 @@ class TeamLeader(Role):
if not self.rc.env:
# If env does not exist, do not publish the message
return
msg.sent_from = self.profile
msg.cause_by = RunCommand
self.rc.env.publish_message(msg, publicer=self.profile)
def finish_current_task(self):
self.planner.plan.finish_current_task()
self.rc.memory.add(Message(content=FINISH_CURRENT_TASK_CMD, role="assistant"))