Merge branch 'gitlab/mgx_ops' into feature/rfc243

This commit is contained in:
莘权 马 2024-06-24 10:28:04 +08:00
commit 2c69ce4f9c
11 changed files with 203 additions and 185 deletions

View file

@ -159,7 +159,7 @@ class Environment(ExtEnv):
"""增加一个在当前环境的角色
Add a role in the current environment
"""
self.roles[role.profile] = role
self.roles[role.name] = role
role.set_env(self)
role.context = self.context
@ -168,7 +168,7 @@ class Environment(ExtEnv):
Add a batch of characters in the current environment
"""
for role in roles:
self.roles[role.profile] = role
self.roles[role.name] = role
for role in roles: # setup system message with roles
role.context = self.context

View file

@ -17,20 +17,33 @@ from metagpt.utils.common import any_to_str, any_to_str_set
class MGXEnv(Environment):
"""MGX Environment"""
# Before enabling TL to fully take over the routing, all software company roles need to be able to handle TL messages, which requires restructuring.
allow_bypass_team_leader: bool = True
# If True, fixed software sop bypassing TL is allowed, otherwise, TL will fully take over the routing
allow_bypass_team_leader: bool = False
direct_chat_roles: set[str] = set() # record direct chat: @role_name
def _publish_message(self, message: Message, peekable: bool = True) -> bool:
return super().publish_message(message, peekable)
def publish_message(self, message: Message, user_defined_recipient: str = "", publicer: str = "") -> bool:
"""let the team leader take over message publishing"""
tl = self.get_role("Team Leader")
tl = self.get_role("Tim") # TeamLeader's name is Tim
if user_defined_recipient:
# human user's direct chat message to a certain role
for role_name in message.send_to:
if self.get_role(role_name).is_idle:
# User starts a new direct chat with a certain role, expecting a direct chat response from the role; Other roles including TL should not be involved.
# If the role is not idle, it means the user helps the role with its current work, in this case, we handle the role's response message as usual.
self.direct_chat_roles.add(role_name)
self._publish_message(message)
# bypass team leader, team leader only needs to know but not to react
tl.rc.memory.add(self.move_message_info_to_content(message))
# # bypass team leader, team leader only needs to know but not to react (commented out because TL doesn't understand the message well in actual experiments)
# tl.rc.memory.add(self.move_message_info_to_content(message))
elif message.sent_from in self.direct_chat_roles:
# direct chat response from a certain role to human user, team leader and other roles in the env should not be involved, no need to publish
self.direct_chat_roles.remove(message.sent_from)
elif (
self.allow_bypass_team_leader
@ -106,3 +119,6 @@ class MGXEnv(Environment):
sent_from = converted_msg.metadata[AGENT] if AGENT in converted_msg.metadata else converted_msg.sent_from
converted_msg.content = f"from {sent_from} to {converted_msg.send_to}: {converted_msg.content}"
return converted_msg
def __repr__(self):
return "MGXEnv()"

View file

@ -20,7 +20,7 @@ class Task(BaseModel):
# Available Commands
{available_commands}
Special Command: Use {{"command_name": "pass"}} to do nothing and {{"command_name": "end"}} to indicate completion of all requirements and the end of actions.
Special Command: Use {{"command_name": "end"}} to do nothing or indicate completion of all requirements and the end of actions.
# Current Plan
{plan_status}

View file

@ -31,5 +31,6 @@ FINISH_CURRENT_TASK_CMD = """
"command_name": "Plan.finish_current_task",
"args": {{}}
}
]
```
"""

View file

@ -41,6 +41,7 @@ class DataAnalyst(DataInterpreter):
# Command.PASS,
]
commands: list[dict] = [] # issued commands to be executed
user_requirement: str = ""
@model_validator(mode="after")
def set_plan_and_tool(self) -> "DataInterpreter":
@ -68,9 +69,6 @@ class DataAnalyst(DataInterpreter):
self.user_requirement = self.get_memories()[-1].content
self.planner.plan.goal = self.user_requirement
example = KeywordExpRetriever().retrieve(self.user_requirement)
else:
self.working_memory.add_batch(self.rc.news)
# TODO: implement experience retrieval in multi-round setting
plan_status = self.planner.plan.model_dump(include=["goal", "tasks"])
# for task in plan_status["tasks"]:
@ -82,10 +80,11 @@ class DataAnalyst(DataInterpreter):
available_commands=prepare_command_prompt(self.available_commands),
)
context = self.llm.format_msg(self.working_memory.get() + [Message(content=prompt, role="user")])
async with ThoughtReporter():
# print(*context, sep="\n" + "*" * 5 + "\n")
async with ThoughtReporter(enable_llm_stream=True):
rsp = await self.llm.aask(context)
self.commands = json.loads(CodeParser.parse_code(block=None, text=rsp))
self.rc.memory.add(Message(content=rsp, role="assistant"))
self.rc.working_memory.add(Message(content=rsp, role="assistant"))
await run_commands(self, self.commands, self.rc.working_memory)
@ -118,7 +117,11 @@ class DataAnalyst(DataInterpreter):
rsp = Message(content="No actions taken yet", cause_by=Action) # will be overwritten after Role _act
while actions_taken < self.rc.max_react_loop:
# NOTE: Diff 2: Keep observing within _react, news will go into memory, allowing adapting to new info
# add news from self._observe, the one called in self.run, consider removing when switching from working_memory to memory
self.working_memory.add_batch(self.rc.news)
await self._observe()
# add news from this self._observe, we need twice because _observe rewrites rc.news
self.working_memory.add_batch(self.rc.news)
# think
has_todo = await self._think()

View file

@ -74,7 +74,7 @@ class DataInterpreter(Role):
return True
prompt = REACT_THINK_PROMPT.format(user_requirement=self.user_requirement, context=context)
async with ThoughtReporter():
async with ThoughtReporter(enable_llm_stream=True):
rsp = await self.llm.aask(prompt)
rsp_dict = json.loads(CodeParser.parse_code(text=rsp))
self.working_memory.add(Message(content=rsp_dict["thoughts"], role="assistant"))

View file

@ -53,7 +53,6 @@ class RoleZero(Role):
experience_retriever: ExpRetriever = DummyExpRetriever()
# Others
user_requirement: str = ""
command_rsp: str = "" # the raw string containing the commands
commands: list[dict] = [] # commands to be executed
memory_k: int = 20 # number of memories (messages) to use as historical context
@ -106,8 +105,7 @@ class RoleZero(Role):
return False
if not self.planner.plan.goal:
self.user_requirement = self.get_memories()[-1].content
self.planner.plan.goal = self.user_requirement
self.planner.plan.goal = self.get_memories()[-1].content
### 1. Experience ###
example = self._retrieve_experience()
@ -129,7 +127,7 @@ class RoleZero(Role):
)
context = self.llm.format_msg(self.rc.memory.get(self.memory_k) + [UserMessage(content=prompt)])
# print(*context, sep="\n" + "*" * 5 + "\n")
async with ThoughtReporter():
async with ThoughtReporter(enable_llm_stream=True):
self.command_rsp = await self.llm.aask(context, system_msgs=self.system_msg)
self.rc.memory.add(AIMessage(content=self.command_rsp))

View file

@ -250,164 +250,165 @@ class TRDExpRetriever(ExpRetriever):
"""
TL_EXAMPLE = """
## example 1
User Requirement: Create a cli snake game using Python.
Explanation: The requirement is about software development. Assign each tasks to a different team member based on their expertise. When publishing message to Product Manager, we copy original user requirement directly to ensure no information loss.
```json
[
{
"command_name": "Plan.append_task",
"args": {
"task_id": "1",
"dependent_task_ids": [],
"instruction": "Create a product requirement document (PRD) outlining the features, user interface, and user experience of the CLI python snake game.",
"assignee": "Alice"
}
},
{
"command_name": "Plan.append_task",
"args": {
"task_id": "2",
"dependent_task_ids": ["1"],
"instruction": "Design the software architecture for the CLI snake game, including the choice of programming language, libraries, and data flow.",
"assignee": "Bob"
}
},
{
"command_name": "Plan.append_task",
"args": {
"task_id": "3",
"dependent_task_ids": ["2"],
"instruction": "Break down the architecture into manageable tasks, identify task dependencies, and prepare a detailed task list for implementation.",
"assignee": "Eve"
}
},
{
"command_name": "Plan.append_task",
"args": {
"task_id": "4",
"dependent_task_ids": ["3"],
"instruction": "Implement the core game logic for the CLI snake game, including snake movement, food generation, and score tracking.",
"assignee": "Alex"
}
},
{
"command_name": "Plan.append_task",
"args": {
"task_id": "5",
"dependent_task_ids": ["4"],
"instruction": "Write comprehensive tests for the game logic and user interface to ensure functionality and reliability.",
"assignee": "Edward"
}
},
{
"command_name": "TeamLeader.publish_message",
"args": {
"content": "Create a cli snake game using Python",
"send_to": "Alice"
}
},
{
"command_name": "RoleZero.reply_to_human",
"args": {
"content": "I have assigned the tasks to the team members. Alice will create the PRD, Bob will design the software architecture, Eve will break down the architecture into tasks, Alex will implement the core game logic, and Edward will write comprehensive tests. The team will work on the project accordingly",
}
},
{
"command_name": "end"
}
]
```
## example 2
User Requirement: Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.
Explanation: DON'T decompose requirement if it is a DATA-RELATED task, assign a single task directly to Data Analyst David. He will manage the decomposition and implementation.
```json
[
{
"command_name": "Plan.append_task",
"args": {
"task_id": "1",
"dependent_task_ids": [],
"instruction": "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.",
"assignee": "David"
}
},
{
"command_name": "TeamLeader.publish_message",
"args": {
"content": "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.",
"send_to": "David"
}
},
{
"command_name": "RoleZero.reply_to_human",
"args": {
"content": "I have assigned the task to David. He will break down the task further by himself and starts solving it.",
}
},
{
"command_name": "end"
}
]
```
## example 3
Conversation History:
[
...,
{'role': 'assistant', 'content': 'from Alice(Product Manager) to {'Bob'}: {'docs': {'20240424153821.json': {'root_path': 'docs/prd', 'filename': '20240424153821.json', 'content': '{"Language":"en_us","Programming Language":"Python","Original Requirements":"create a cli snake game","Project Name":"snake_game","Product Goals":["Develop an intuitive and addictive snake game",...], ...}}}}},
]
Explanation: You received a message from Alice, the Product Manager, that she has completed the PRD, use Plan.finish_current_task to mark her task as finished and moves the plan to the next task. Based on plan status, next task is for Bob (Architect), publish a message asking him to start. The message content should contain important path info.
```json
[
{
"command_name": "Plan.finish_current_task",
"args": {}
},
{
"command_name": "TeamLeader.publish_message",
"args": {
"content": "Please design the software architecture for the snake game based on the PRD created by Alice. The PRD is at 'docs/prd/20240424153821.json'. Include the choice of programming language, libraries, and data flow, etc.",
"send_to": "Bob"
}
},
{
"command_name": "RoleZero.reply_to_human",
"args": {
"content": "Alice has completed the PRD. I have marked her task as finished and sent the PRD to Bob. Bob will work on the software architecture.",
}
},
{
"command_name": "end"
}
]
```
## example 4
User Question: how does the project go?
Explanation: The user is asking for a general update on the project status. Give a straight answer about the current task the team is working on and provide a summary of the completed tasks.
```json
[
{
"command_name": "RoleZero.reply_to_human",
"args": {
"content": "The team is currently working on ... We have completed ...",
}
},
{
"command_name": "end"
}
]
```
"""
class SimpleExpRetriever(ExpRetriever):
"""A simple experience retriever that returns manually crafted examples."""
EXAMPLE: str = """
## example 1
User Requirement: Create a cli snake game using Python.
Explanation: The requirement is about software development. Assign each tasks to a different team member based on their expertise. When publishing message to Product Manager, we copy original user requirement directly to ensure no information loss.
```json
[
{
"command_name": "Plan.append_task",
"args": {
"task_id": "1",
"dependent_task_ids": [],
"instruction": "Create a product requirement document (PRD) outlining the features, user interface, and user experience of the CLI python snake game.",
"assignee": "Alice"
}
},
{
"command_name": "Plan.append_task",
"args": {
"task_id": "2",
"dependent_task_ids": ["1"],
"instruction": "Design the software architecture for the CLI snake game, including the choice of programming language, libraries, and data flow.",
"assignee": "Bob"
}
},
{
"command_name": "Plan.append_task",
"args": {
"task_id": "3",
"dependent_task_ids": ["2"],
"instruction": "Break down the architecture into manageable tasks, identify task dependencies, and prepare a detailed task list for implementation.",
"assignee": "Eve"
}
},
{
"command_name": "Plan.append_task",
"args": {
"task_id": "4",
"dependent_task_ids": ["3"],
"instruction": "Implement the core game logic for the CLI snake game, including snake movement, food generation, and score tracking.",
"assignee": "Alex"
}
},
{
"command_name": "Plan.append_task",
"args": {
"task_id": "5",
"dependent_task_ids": ["4"],
"instruction": "Write comprehensive tests for the game logic and user interface to ensure functionality and reliability.",
"assignee": "Edward"
}
},
{
"command_name": "TeamLeader.publish_message",
"args": {
"content": "Create a cli snake game using Python",
"send_to": "Alice"
}
},
{
"command_name": "RoleZero.reply_to_human",
"args": {
"content": "I have assigned the tasks to the team members. Alice will create the PRD, Bob will design the software architecture, Eve will break down the architecture into tasks, Alex will implement the core game logic, and Edward will write comprehensive tests. The team will work on the project accordingly",
}
},
{
"command_name": "end"
}
]
```
## example 2
User Requirement: Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.
Explanation: DON'T decompose requirement if it is a DATA-RELATED task, assign a single task directly to Data Analyst David. He will manage the decomposition and implementation.
```json
[
{
"command_name": "Plan.append_task",
"args": {
"task_id": "1",
"dependent_task_ids": [],
"instruction": "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.",
"assignee": "David"
}
},
{
"command_name": "TeamLeader.publish_message",
"args": {
"content": "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.",
"send_to": "David"
}
},
{
"command_name": "RoleZero.reply_to_human",
"args": {
"content": "I have assigned the task to David. He will break down the task further by himself and starts solving it.",
}
},
{
"command_name": "end"
}
]
```
## example 3
Conversation History:
[
...,
{'role': 'assistant', 'content': 'from Alice(Product Manager) to {'Bob'}: {'docs': {'20240424153821.json': {'root_path': 'docs/prd', 'filename': '20240424153821.json', 'content': '{"Language":"en_us","Programming Language":"Python","Original Requirements":"create a cli snake game","Project Name":"snake_game","Product Goals":["Develop an intuitive and addictive snake game",...], ...}}}}},
]
Explanation: You received a message from Alice, the Product Manager, that she has completed the PRD, use Plan.finish_current_task to mark her task as finished and moves the plan to the next task. Based on plan status, next task is for Bob (Architect), publish a message asking him to start. The message content should contain important path info.
```json
[
{
"command_name": "Plan.finish_current_task",
"args": {}
},
{
"command_name": "TeamLeader.publish_message",
"args": {
"content": "Please design the software architecture for the snake game based on the PRD created by Alice. The PRD is at 'docs/prd/20240424153821.json'. Include the choice of programming language, libraries, and data flow, etc.",
"send_to": "Bob"
}
},
{
"command_name": "RoleZero.reply_to_human",
"args": {
"content": "Alice has completed the PRD. I have marked her task as finished and sent the PRD to Bob. Bob will work on the software architecture.",
}
},
{
"command_name": "end"
}
]
```
## example 4
User Question: how does the project go?
Explanation: The user is asking for a general update on the project status. Give a straight answer about the current task the team is working on and provide a summary of the completed tasks.
```json
[
{
"command_name": "RoleZero.reply_to_human",
"args": {
"content": "The team is currently working on ... We have completed ...",
}
},
{
"command_name": "end"
}
]
```
"""
def retrieve(self, context: str = "") -> str:
return self.EXAMPLE
return TL_EXAMPLE
class KeywordExpRetriever(ExpRetriever):

View file

@ -1,5 +1,4 @@
import os
import re
import shutil
import subprocess
@ -27,15 +26,16 @@ class Editor:
def write(self, path: str, content: str):
"""Write the whole content to a file. When used, make sure content arg contains the full content of the file."""
if len(re.findall(r"\\n", content)) >= 5:
# A very raw rule to correct the content: Many \\n suggests all new line characters are mistaken as \\n whereas the correct one should be \n
if "\n" not in content and "\\n" in content:
# A very raw rule to correct the content: If 'content' lacks actual newlines ('\n') but includes '\\n', consider
# replacing them with '\n' to potentially correct mistaken representations of newline characters.
content = content.replace("\\n", "\n")
directory = os.path.dirname(path)
if directory and not os.path.exists(directory):
os.makedirs(directory)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
self.resource.report(path, "path")
# self.resource.report(path, "path")
def read(self, path: str) -> FileBlock:
"""Read the whole content of a file."""

View file

@ -275,7 +275,7 @@ class CodeParser:
def parse_code(cls, text: str, lang: str = "", block: Optional[str] = None) -> str:
if block:
text = cls.parse_block(block, text)
pattern = rf"```{lang}.*?\s+(.*?)```"
pattern = rf"```{lang}.*?\s+(.*?)\n```"
match = re.search(pattern, text, re.DOTALL)
if match:
code = match.group(1)

View file

@ -51,7 +51,6 @@ class ResourceReporter(BaseModel):
block: BlockType = Field(description="The type of block that is reporting the resource")
uuid: UUID = Field(default_factory=uuid4, description="The unique identifier for the resource")
is_chunk: bool = Field(False, description="Indicates whether the report is a chunk of a stream")
enable_llm_stream: bool = Field(False, description="Indicates whether to connect to an LLM stream for reporting")
callback_url: str = Field(METAGPT_REPORTER_DEFAULT_URL, description="The URL to which the report should be sent")
_llm_task: Optional[asyncio.Task] = PrivateAttr(None)
@ -153,17 +152,14 @@ class ResourceReporter(BaseModel):
def __enter__(self):
"""Enter the synchronous streaming callback context."""
self.is_chunk = True
return self
def __exit__(self, *args, **kwargs):
"""Exit the synchronous streaming callback context."""
self.report(None, END_MARKER_NAME)
self.is_chunk = False
async def __aenter__(self):
"""Enter the asynchronous streaming callback context."""
self.is_chunk = True
if self.enable_llm_stream:
queue = create_llm_stream_queue()
self._llm_task = asyncio.create_task(self._llm_stream_report(queue))
@ -171,15 +167,18 @@ class ResourceReporter(BaseModel):
async def __aexit__(self, *args, **kwargs):
"""Exit the asynchronous streaming callback context."""
self.is_chunk = False
if self.enable_llm_stream:
self._llm_task.cancel()
await get_llm_stream_queue().put(None)
await self._llm_task
self._llm_task = None
await self.async_report(None, END_MARKER_NAME)
async def _llm_stream_report(self, queue: asyncio.Queue):
while self.is_chunk:
await self.async_report(await queue.get(), "content")
while True:
data = await queue.get()
if data is None:
return
await self.async_report(data, "content")
async def wait_llm_stream_report(self):
"""Wait for the LLM stream report to complete."""