Merge branch 'dynamic_think' into 'mgx_ops'

bugfix for run_command and add a switch for routing

See merge request pub/MetaGPT!87
This commit is contained in:
林义章 2024-04-30 10:48:44 +00:00
commit 2cf28e5a18
4 changed files with 27 additions and 15 deletions

View file

@ -6,6 +6,7 @@ from metagpt.actions import (
WriteTest,
)
from metagpt.actions.summarize_code import SummarizeCode
from metagpt.const import AGENT
from metagpt.environment.base_env import Environment
from metagpt.logs import get_human_input
from metagpt.roles import (
@ -23,6 +24,9 @@ from metagpt.utils.common import any_to_str, any_to_str_set
class MGXEnv(Environment):
"""MGX Environment"""
# Before enabling TL to fully take over the routing, all software company roles need to be able to handle TL messages, which requires restructuring.
allow_bypass_team_leader: bool = True
def _publish_message(self, message: Message, peekable: bool = True) -> bool:
return super().publish_message(message, peekable)
@ -35,7 +39,11 @@ class MGXEnv(Environment):
# bypass team leader, team leader only needs to know but not to react
tl.rc.memory.add(self.move_message_info_to_content(message))
elif self.message_within_software_sop(message) and not self.has_user_requirement():
elif (
self.allow_bypass_team_leader
and self.message_within_software_sop(message)
and not self.has_user_requirement()
):
# Quick routing for messages within software SOP, bypassing TL.
# Use rules to check for user intervention and to finish task.
# NOTE: This escapes TL's supervision and has pitfalls such as routing obsolete messages even if TL has acquired a new user requirement.
@ -47,6 +55,9 @@ class MGXEnv(Environment):
tl.finish_current_task()
elif publicer == tl.profile:
if message.send_to == {"no one"}:
# skip the dummy message from team leader
return True
# message processed by team leader can be published now
self._publish_message(message)
@ -71,7 +82,7 @@ class MGXEnv(Environment):
def message_within_software_sop(self, message: Message) -> bool:
return message.sent_from in any_to_str_set([ProductManager, Architect, ProjectManager, Engineer, QaEngineer])
def has_user_requirement(self, k=2) -> bool:
def has_user_requirement(self, k=1) -> bool:
"""A heuristics to check if there is a recent user intervention"""
return any_to_str(UserRequirement) in [msg.cause_by for msg in self.history.get(k)]
@ -86,10 +97,8 @@ class MGXEnv(Environment):
1. Convert role, since role field must be reserved for LLM API, and is limited to, for example, one of ["user", "assistant", "system"]
2. Add sender and recipient info to content, making TL aware, since LLM API only takes content as input
"""
if message.role in ["system", "user", "assistant"]:
sent_from = message.sent_from
else:
sent_from = message.role
if message.role not in ["system", "user", "assistant"]:
message.role = "assistant"
sent_from = message.metadata[AGENT] if AGENT in message.metadata else message.sent_from
message.content = f"from {sent_from} to {message.send_to}: {message.content}"
return message

View file

@ -83,7 +83,7 @@ class DataAnalyst(DataInterpreter):
self.commands = json.loads(CodeParser.parse_code(block=None, text=rsp))
self.rc.memory.add(Message(content=rsp, role="assistant"))
await run_commands(self, self.commands)
await run_commands(self, self.commands, self.rc.working_memory)
return bool(self.rc.todo)

View file

@ -40,9 +40,6 @@ class TeamLeader(Role):
@model_validator(mode="after")
def set_plan(self) -> "TeamLeader":
self.rc.working_memory = (
self.rc.memory
) # TeamLeader does not need working memory, all messages should go into memory
self.planner = Planner(goal=self.goal, working_memory=self.rc.working_memory, auto_run=True)
return self
@ -80,7 +77,7 @@ class TeamLeader(Role):
async def _act(self) -> Message:
"""Useful in 'react' mode. Return a Message conforming to Role._act interface."""
await run_commands(self, self.commands)
await run_commands(self, self.commands, self.rc.memory)
self.task_result = TaskResult(result="Success", is_success=True)
msg = Message(content="Commands executed", send_to="no one") # a dummy message to conform to the interface
self.rc.memory.add(msg)

View file

@ -1,8 +1,11 @@
from __future__ import annotations
from enum import Enum
from pydantic import BaseModel
from metagpt.environment.mgx.mgx_env import MGXEnv
from metagpt.memory import Memory
from metagpt.roles import Role
from metagpt.schema import Message, Task
@ -72,11 +75,12 @@ def prepare_command_prompt(commands: list[Command]) -> str:
return command_prompt
async def run_env_command(role: Role, cmd):
async def run_env_command(role: Role, cmd: list[dict], role_memory: Memory = None):
assert isinstance(role.rc.env, MGXEnv), "TeamLeader should only be used in an MGXEnv"
if cmd["command_name"] == Command.PUBLISH_MESSAGE.cmd_name:
role.publish_message(Message(**cmd["args"]))
if cmd["command_name"] == Command.ASK_HUMAN.cmd_name:
# TODO: Operation on role memory should not appear here, consider moving it into role
role.rc.working_memory.add(Message(content=cmd["args"]["question"], role="assistant"))
human_rsp = await role.rc.env.ask_human(sent_from=role, **cmd["args"])
role.rc.working_memory.add(Message(content=human_rsp, role="user"))
@ -85,7 +89,7 @@ async def run_env_command(role: Role, cmd):
await role.rc.env.reply_to_human(sent_from=role, **cmd["args"])
def run_plan_command(role: Role, cmd):
def run_plan_command(role: Role, cmd: list[dict]):
if cmd["command_name"] == Command.APPEND_TASK.cmd_name:
role.planner.plan.append_task(Task(**cmd["args"]))
elif cmd["command_name"] == Command.RESET_TASK.cmd_name:
@ -93,15 +97,17 @@ def run_plan_command(role: Role, cmd):
elif cmd["command_name"] == Command.REPLACE_TASK.cmd_name:
role.planner.plan.replace_task(Task(**cmd["args"]))
elif cmd["command_name"] == Command.FINISH_CURRENT_TASK.cmd_name:
if role.planner.plan.is_plan_finished():
return
role.planner.plan.current_task.update_task_result(task_result=role.task_result)
role.planner.plan.finish_current_task()
role.rc.working_memory.clear()
async def run_commands(role: Role, cmds):
async def run_commands(role: Role, cmds: list[dict], role_memory: Memory = None):
print(*cmds, sep="\n")
for cmd in cmds:
await run_env_command(role, cmd)
await run_env_command(role, cmd, role_memory)
run_plan_command(role, cmd)
if role.planner.plan.is_plan_finished():