Merge branch 'public_chat' into 'mgx_ops'

Switch to Public Chat

See merge request pub/MetaGPT!397
This commit is contained in:
林义章 2024-10-16 06:12:44 +00:00
commit 6f40dc62a1
7 changed files with 42 additions and 67 deletions

View file

@ -1,30 +1,24 @@
from __future__ import annotations
from metagpt.actions import (
UserRequirement,
WriteDesign,
WritePRD,
WriteTasks,
WriteTest,
)
from metagpt.actions.summarize_code import SummarizeCode
from metagpt.const import AGENT, IMAGES, TEAMLEADER_NAME
from metagpt.const import AGENT, IMAGES, MESSAGE_ROUTE_TO_ALL, TEAMLEADER_NAME
from metagpt.environment.base_env import Environment
from metagpt.logs import get_human_input
from metagpt.roles import Architect, ProductManager, ProjectManager, Role
from metagpt.roles import Role
from metagpt.schema import Message, SerializationMixin
from metagpt.utils.common import any_to_str, any_to_str_set, extract_and_encode_images
from metagpt.utils.common import extract_and_encode_images
class MGXEnv(Environment, SerializationMixin):
"""MGX Environment"""
# If True, fixed software sop bypassing TL is allowed, otherwise, TL will fully take over the routing
allow_bypass_team_leader: bool = False
direct_chat_roles: set[str] = set() # record direct chat: @role_name
is_public_chat: bool = True
def _publish_message(self, message: Message, peekable: bool = True) -> bool:
if self.is_public_chat:
message.send_to.add(MESSAGE_ROUTE_TO_ALL)
message = self.move_message_info_to_content(message)
return super().publish_message(message, peekable)
def publish_message(self, message: Message, user_defined_recipient: str = "", publicer: str = "") -> bool:
@ -46,31 +40,10 @@ class MGXEnv(Environment, SerializationMixin):
# tl.rc.memory.add(self.move_message_info_to_content(message))
elif message.sent_from in self.direct_chat_roles:
# direct chat response from a certain role to human user, team leader and other roles in the env should not be involved, no need to publish
# if chat is not public, direct chat response from a certain role to human user, team leader and other roles in the env should not be involved, no need to publish
self.direct_chat_roles.remove(message.sent_from)
elif (
self.allow_bypass_team_leader
and self.message_within_software_sop(message)
and not self.has_user_requirement()
):
# Quick routing for messages within software SOP, bypassing TL.
# Use rules to check for user intervention and to finish task.
# NOTE: This escapes TL's supervision and has pitfalls such as routing obsolete messages even if TL has acquired a new user requirement.
# In addition, we should not determine the status of a task based on message cause_by.
# Consider replacing this in the future.
self._publish_message(message)
if self.is_software_task_finished(message):
tl.rc.memory.add(self.move_message_info_to_content(message))
from metagpt.utils.report import CURRENT_ROLE
role = CURRENT_ROLE.get(None)
if role:
CURRENT_ROLE.set(tl)
tl.finish_current_task()
CURRENT_ROLE.set(role)
else:
tl.finish_current_task()
if self.is_public_chat:
self._publish_message(message)
elif publicer == tl.profile:
if message.send_to == {"no one"}:
@ -81,9 +54,8 @@ class MGXEnv(Environment, SerializationMixin):
else:
# every regular message goes through team leader
message = self.move_message_info_to_content(message)
message.send_to.add(tl.name)
tl.put_message(message)
self._publish_message(message)
self.history.add(message)
@ -98,21 +70,6 @@ class MGXEnv(Environment, SerializationMixin):
# NOTE: Can be overwritten in remote setting
return "SUCCESS, human has received your reply. Refrain from resending duplicate messages. If you no longer need to take action, use the command end to stop."
def message_within_software_sop(self, message: Message) -> bool:
# Engineer, QaEngineer can be end of the SOP. Their msg requires routing outside.
members_concerned = [ProductManager, Architect, ProjectManager]
return message.sent_from in any_to_str_set(members_concerned)
def has_user_requirement(self, k=1) -> bool:
"""A heuristics to check if there is a recent user intervention"""
return any_to_str(UserRequirement) in [msg.cause_by for msg in self.history.get(k)]
def is_software_task_finished(self, message: Message) -> bool:
"""Use a hard-coded rule to check if one software task is finished"""
return message.cause_by in any_to_str_set([WritePRD, WriteDesign, WriteTasks, SummarizeCode]) or (
message.cause_by == any_to_str(WriteTest) and "Exceeding" in message.content
)
def move_message_info_to_content(self, message: Message) -> Message:
"""Two things here:
1. Convert role, since role field must be reserved for LLM API, and is limited to, for example, one of ["user", "assistant", "system"]
@ -122,9 +79,13 @@ class MGXEnv(Environment, SerializationMixin):
if converted_msg.role not in ["system", "user", "assistant"]:
converted_msg.role = "assistant"
sent_from = converted_msg.metadata[AGENT] if AGENT in converted_msg.metadata else converted_msg.sent_from
converted_msg.content = (
f"[Message] from {sent_from or 'User'} to {converted_msg.send_to}: {converted_msg.content}"
)
# When displaying send_to, change it to those who need to react and exclude those who only need to be aware, e.g.:
# send_to={<all>} -> Mike; send_to={Alice} -> Alice; send_to={Alice, <all>} -> Alice.
if converted_msg.send_to == {MESSAGE_ROUTE_TO_ALL}:
send_to = TEAMLEADER_NAME
else:
send_to = ", ".join({role for role in converted_msg.send_to if role != MESSAGE_ROUTE_TO_ALL})
converted_msg.content = f"[Message] from {sent_from or 'User'} to {send_to}: {converted_msg.content}"
return converted_msg
def attach_images(self, message: Message) -> Message:

View file

@ -231,6 +231,8 @@ QUICK_RESPONSE_SYSTEM_PROMPT = """
{role_info}
However, you MUST respond to the user message by yourself directly, DON'T ask your team members.
"""
# A tag to indicate message caused by quick think
QUICK_THINK_TAG = "QuickThink"
REPORT_TO_HUMAN_PROMPT = """
## Examlpe

View file

@ -29,6 +29,7 @@ from metagpt.prompts.di.role_zero import (
QUICK_THINK_EXAMPLES,
QUICK_THINK_PROMPT,
QUICK_THINK_SYSTEM_PROMPT,
QUICK_THINK_TAG,
REGENERATE_PROMPT,
REPORT_TO_HUMAN_PROMPT,
ROLE_INSTRUCTION,
@ -93,6 +94,7 @@ class RoleZero(Role):
experience_retriever: Annotated[ExpRetriever, Field(exclude=True)] = DummyExpRetriever()
# Others
observe_all_msg_from_buffer: bool = True
command_rsp: str = "" # the raw string containing the commands
commands: list[dict] = [] # commands to be executed
memory_k: int = 200 # number of memories (messages) to use as historical context
@ -396,12 +398,12 @@ class RoleZero(Role):
answer = await SearchEnhancedQA().run(query)
if answer:
self.rc.memory.add(AIMessage(content=answer, cause_by=RunCommand))
self.rc.memory.add(AIMessage(content=answer, cause_by=QUICK_THINK_TAG))
await self.reply_to_human(content=answer)
rsp_msg = AIMessage(
content="Complete run",
content=answer,
sent_from=self.name,
cause_by=RunCommand,
cause_by=QUICK_THINK_TAG,
)
return rsp_msg, intent_result

View file

@ -6,6 +6,7 @@ from pydantic import Field
from metagpt.actions.di.run_command import RunCommand
from metagpt.const import TEAMLEADER_NAME
from metagpt.prompts.di.role_zero import QUICK_THINK_TAG
from metagpt.prompts.di.team_leader import (
FINISH_CURRENT_TASK_CMD,
TL_INFO,
@ -61,13 +62,14 @@ class TeamLeader(RoleZero):
return await super()._think()
def publish_message(self, msg: Message, send_to="no one"):
"""Overwrite Role.publish_message, send to no one if called within Role.run, send to the specified role if called dynamically."""
"""Overwrite Role.publish_message, send to no one if called within Role.run (except for quick think), send to the specified role if called dynamically."""
if not msg:
return
if not self.rc.env:
# If env does not exist, do not publish the message
return
msg.send_to = send_to
if msg.cause_by != QUICK_THINK_TAG:
msg.send_to = send_to
self.rc.env.publish_message(msg, publicer=self.profile)
def publish_team_message(self, content: str, send_to: str):

View file

@ -42,8 +42,8 @@ class ProductManager(RoleZero):
# NOTE: The following init setting will only be effective when self.use_fixed_sop is changed to True
self.enable_memory = False
self.set_actions([PrepareDocuments(send_to=any_to_str(self)), WritePRD])
self._watch([UserRequirement, PrepareDocuments])
if self.use_fixed_sop:
self._watch([UserRequirement, PrepareDocuments])
self.rc.react_mode = RoleReactMode.BY_ORDER
def _update_tool_execution(self):

View file

@ -154,6 +154,7 @@ class Role(BaseRole, SerializationMixin, ContextMixin, BaseModel):
# builtin variables
recovered: bool = False # to tag if a recovered role
latest_observed_msg: Optional[Message] = None # record the latest observed message when interrupted
observe_all_msg_from_buffer: bool = False # whether to save all msgs from buffer to memory for role's awareness
__hash__ = object.__hash__ # support Role as hashable type in `Environment.members`
@ -171,7 +172,9 @@ class Role(BaseRole, SerializationMixin, ContextMixin, BaseModel):
self._check_actions()
self.llm.system_prompt = self._get_prefix()
self.llm.cost_manager = self.context.cost_manager
self._watch(kwargs.pop("watch", [UserRequirement]))
# if observe_all_msg_from_buffer, we should not use cause_by to select messages but observe all
if not self.observe_all_msg_from_buffer:
self._watch(kwargs.pop("watch", [UserRequirement]))
if self.latest_observed_msg:
self.recovered = True
@ -396,7 +399,12 @@ class Role(BaseRole, SerializationMixin, ContextMixin, BaseModel):
self.rc.news = [
n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
]
self.rc.memory.add_batch(self.rc.news) # only save messages of interest into memory
if self.observe_all_msg_from_buffer:
# save all new messages from the buffer into memory, the role may not react to them but can be aware of them
self.rc.memory.add_batch(news)
else:
# only save messages of interest into memory
self.rc.memory.add_batch(self.rc.news)
self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None # record the latest observed msg
# Design Rules:

View file

@ -18,7 +18,7 @@ async def main(requirement="", enable_human_input=False, use_fixed_sop=False, al
else:
engineer = Engineer2()
env = MGXEnv(allow_bypass_team_leader=use_fixed_sop)
env = MGXEnv()
env.add_roles(
[
TeamLeader(),