rough scratch for tl and thinking command

This commit is contained in:
yzlin 2024-04-25 01:10:30 +08:00
parent ed8777db99
commit 1b57395d0e
6 changed files with 616 additions and 0 deletions

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :

View file

@ -0,0 +1,47 @@
from metagpt.environment.base_env import Environment
from metagpt.logs import get_human_input
from metagpt.schema import Message
class MGXEnv(Environment):
"""MGX Environment"""
history: dict[str, Message] = {} # redefine message history
def _publish_message(self, message: Message, peekable: bool = True) -> bool:
return super().publish_message(message, peekable)
def publish_message(self, message: Message, user_defined_recipient: str = "", publicer: str = "") -> bool:
"""let the team leader take over message publishing"""
tl = self.get_role("Team Leader")
if user_defined_recipient:
self._publish_message(message)
# bypass team leader, team leader only needs to know but not to react
tl.rc.memory.add(message)
elif publicer == tl.profile:
# message processed by team leader can be published now
self._publish_message(message)
else:
# every regular message goes through team leader
tl.put_message(message)
self.history[message.id] = message
return True
def forward_message(self, message_id: str) -> str:
if message_id not in self.history:
return f"invalid message_id {message_id}, not found in history."
msg = self.history[message_id]
return self._publish_message(msg)
async def ask_human(self, question: str) -> str:
# NOTE: Can be overwritten in remote setting
return get_human_input(question)
async def reply_to_human(self, message: str) -> str:
# NOTE: Can be overwritten in remote setting
return message

View file

@ -0,0 +1,182 @@
from metagpt.strategy.thinking_command import Command
def prepare_command_prompt(commands: list[Command]) -> str:
command_prompt = ""
for i, command in enumerate(commands):
command_prompt += f"{i+1}. {command.value.signature}:\n{command.value.desc}\n\n"
print(command_prompt)
return command_prompt
PLANNING_CMD_PROMPT = """
# Data Structure
class Task(BaseModel):
task_id: str = ""
dependent_task_ids: list[str] = []
instruction: str = ""
task_type: str = ""
assignee: str = ""
# Team Member Info
{team_info}
# Available Commands
{available_commands}
# Current Plan
{plan_status}
# Example
{example}
# Instructions
You are a team leader, and you are responsible for drafting tasks and routing tasks to your team members.
You should NOT assign consecutive tasks to the same team member, instead, assign an aggregated task (or the complete requirement) and let the team member to decompose it.
If plan is created, you should track the progress based on team member feedback message, and update plan accordingly, such as finish_current_task, reset_task, replace_task, etc.
Note:
1. If the requirement is a pure DATA-RELATED requirement, such as bug fixes, issue reporting, environment setup, terminal operations, pip install, web browsing, web scraping, web imitation, data science, data analysis, machine learning, deep learning, text-to-image etc. DON'T decompose it, assign a single task with the original user requirement as instruction directly to Data Analyst.
2. If the requirement is developing a software, game, app, or website, excluding the above data-related tasks, you should decompose the requirement into multiple tasks and assign them to different team members based on their expertise, usually the sequence of Product Manager -> Architect -> Project Manager -> Engineer -> QaEngine, each assigned ONE task.
3. If the requirement contains both DATA-RELATED part and software development part, you should decompose the software development part and assign them to different team members based on their expertise, and assign the DATA-RELATED part to Data Analyst David directly.
Pay close attention to the Example provided
You may use any of the available commands to create a plan or update the plan. You may output mutiple commands, they will be executed sequentially.
If you finish current task, you will automatically take the next task in the existing plan, use finish_task, DON'T append a new task.
# Your commands in a json array, in the following output format:
Some text indicating your thoughts, including how you categorize the requirement based on Note or how you should update the plan status. Then a json array of commands.
```json
[
{{
"command_name": str,
"args": {{"arg_name": arg_value, ...}}
}},
...
]
```
"""
ROUTING_CMD_PROMPT = """
# Team Member Info
{team_info}
# Available Commands
{available_commands}
# Current Plan
{plan_status}
# Example
{example}
# Instructions
You are a team leader, you can use publish_message or forward_message to team members, asking them to start their task.
Note:
1. You should carefully review the recent conversation, if there are messages from team members, forward them or withold them properly.
2. Prioritize forward_message if messages exist, only public_message if you want to instruct the team member yourself.
3. Pay attention to task dependency, don't assign a task to a team member if the dependent tasks are not finished yet.
4. Review plan status, think about what you should do next, you can use any of the Available Commands.
# Your commands in a json array, in the following output format:
```json
[
{{
"command_name": str,
"args": {{"arg_name": arg_value, ...}}
}},
...
]
"""
PLANNING_EXAMPLE = """
## example 1
User Requirement: Create a cli snake game
Explanation: The requirement is about software development. Assign each tasks to a different team member based on their expertise.
```json
[
{
"command_name": "append_task",
"args": {
"task_id": "1",
"dependent_task_ids": [],
"instruction": "Create a product requirement document (PRD) outlining the features, user interface, and user experience of the CLI snake game.",
"assignee": "Alice"
}
},
{
"command_name": "append_task",
"args": {
"task_id": "2",
"dependent_task_ids": ["1"],
"instruction": "Design the software architecture for the CLI snake game, including the choice of programming language, libraries, and data flow.",
"assignee": "Bob"
}
},
{
"command_name": "append_task",
"args": {
"task_id": "3",
"dependent_task_ids": ["2"],
"instruction": "Break down the architecture into manageable tasks, identify task dependencies, and prepare a detailed task list for implementation.",
"assignee": "Eve"
}
},
{
"command_name": "append_task",
"args": {
"task_id": "4",
"dependent_task_ids": ["3"],
"instruction": "Implement the core game logic for the CLI snake game, including snake movement, food generation, and score tracking.",
"assignee": "Alex"
}
},
{
"command_name": "append_task",
"args": {
"task_id": "5",
"dependent_task_ids": ["4"],
"instruction": "Write comprehensive tests for the game logic and user interface to ensure functionality and reliability.",
"assignee": "Edward"
}
}
]
```
## example 2
User requirement: Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.
Explanation: DON'T decompose requirement if it is a DATA-RELATED task, assign a single task directly to Data Analyst David. He will manage the decomposition and implementation.
```json
[
{
"command_name": "append_task",
"args": {
"task_id": "1",
"dependent_task_ids": [],
"instruction": "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.",
"assignee": "David"
}
}
]
```
"""
ROUTING_EXAMPLE = """
## example 1: Forward a message from one team member to another
Conversation History:
[
...,
{'role': 'assistant', 'content': 'id: 739d9b4983fd4e97a0f78fde5e9ef158, from Alice(Product Manager) to {'Bob'}: {'docs': {'20240424153821.json': {'root_path': 'docs/prd', 'filename': '20240424153821.json', 'content': '{"Language":"en_us","Programming Language":"Python","Original Requirements":"create a cli snake game","Project Name":"snake_game","Product Goals":["Develop an intuitive and addictive snake game",...], ...}}}}},
{'role': 'assistant', 'content': 'Based on the feedback from Alice, the Product Manager, it seems that the PRD for the snake game has been successfully created. Since the PRD is complete and there are no indications of issues with the task, we can mark the current task as finished and move on to the next task in the plan. The next task is assigned to Bob, the Architect, who will be responsible for designing the software architecture for the game based on the PRD provided by Alice.\n\nHere are the commands to update the plan status:\n\n```json\n[\n {\n "command_name": "finish_current_task"\n }\n]\n```'}
]
Command:
```json
[
{
"command_name": "forward_message",
"args": {
"message_id": "739d9b4983fd4e97a0f78fde5e9ef158"
}
}
]
"""

View file

@ -0,0 +1,151 @@
from __future__ import annotations
import json
from pydantic import model_validator
from metagpt.environment.mgx.mgx_env import MGXEnv
from metagpt.prompts.di.team_leader import (
PLANNING_CMD_PROMPT,
PLANNING_EXAMPLE,
ROUTING_CMD_PROMPT,
ROUTING_EXAMPLE,
prepare_command_prompt,
)
from metagpt.roles import Role
from metagpt.schema import Message, Task, TaskResult
from metagpt.strategy.planner import Planner
from metagpt.strategy.thinking_command import Command
from metagpt.utils.common import CodeParser
class TeamLeader(Role):
name: str = "Tim"
profile: str = "Team Leader"
task_result: TaskResult = None
planning_commands: list[Command] = [
Command.APPEND_TASK,
Command.RESET_TASK,
Command.REPLACE_TASK,
Command.FINISH_CURRENT_TASK,
Command.REPLY_TO_HUMAN,
Command.PASS,
]
env_commands: list[Command] = [
Command.PUBLISH_MESSAGE,
Command.FORWARD_MESSAGE,
Command.ASK_HUMAN,
Command.REPLY_TO_HUMAN,
Command.PASS,
]
@model_validator(mode="after")
def set_plan(self) -> "TeamLeader":
self.planner = Planner(goal=self.goal, working_memory=self.rc.working_memory, auto_run=True)
return self
def _run_env_command(self, cmd):
assert isinstance(self.rc.env, MGXEnv), "TeamLeader should only be used in an MGXEnv"
if cmd["command_name"] == Command.PUBLISH_MESSAGE.cmd_name:
self.rc.env.publish_message(Message(**cmd["args"]), publicer=self.profile)
elif cmd["command_name"] in Command.FORWARD_MESSAGE.cmd_name:
self.rc.env.forward_message(**cmd["args"])
elif cmd["command_name"] == Command.ASK_HUMAN.cmd_name:
self.rc.env.ask_human(**cmd["args"])
elif cmd["command_name"] == Command.REPLY_TO_HUMAN.cmd_name:
self.rc.env.reply_to_human(**cmd["args"])
def _run_internal_command(self, cmd):
if cmd["command_name"] == Command.APPEND_TASK.cmd_name:
self.planner.plan.append_task(Task(**cmd["args"]))
elif cmd["command_name"] == Command.RESET_TASK.cmd_name:
self.planner.plan.reset_task(**cmd["args"])
elif cmd["command_name"] == Command.REPLACE_TASK.cmd_name:
self.planner.plan.replace_task(Task(**cmd["args"]))
elif cmd["command_name"] == Command.FINISH_CURRENT_TASK.cmd_name:
self.planner.plan.current_task.update_task_result(task_result=self.task_result)
self.planner.plan.finish_current_task()
self.rc.working_memory.clear()
def run_commands(self, cmds):
print(*cmds, sep="\n")
for cmd in cmds:
self._run_env_command(cmd)
self._run_internal_command(cmd)
if self.planner.plan.is_plan_finished():
self._set_state(-1)
def get_memory(self) -> list[Message]:
mem = self.rc.memory.get()
for m in mem:
if m.role not in ["system", "user", "assistant"]:
m.content = f"id: {m.id[:10]}, from {m.role} to {m.send_to}: {m.content}"
m.role = "assistant"
return mem
async def _think(self) -> bool:
"""Useful in 'react' mode. Use LLM to decide whether and what to do next."""
self.commands = []
example = ""
if not self.planner.plan.goal:
user_requirement = self.get_memories()[-1].content
self.planner.plan.goal = user_requirement
example = PLANNING_EXAMPLE
# common info
team_info = ""
for role in self.rc.env.roles.values():
if role.profile == "TeamLeader":
continue
team_info += f"{role.name}: {role.profile}, {role.goal}\n"
# print(team_info)
# plan commands
plan_status = self.planner.plan.model_dump(include=["goal", "tasks"])
for task in plan_status["tasks"]:
task.pop("code")
task.pop("result")
plan_prompt = PLANNING_CMD_PROMPT.format(
plan_status=plan_status,
team_info=team_info,
example=example,
available_commands=prepare_command_prompt(self.planning_commands),
)
context = self.llm.format_msg(self.get_memory() + [Message(content=plan_prompt, role="user")])
print(*context, sep="*" * 10 + "\n\n")
# breakpoint()
plan_rsp = await self.llm.aask(context)
plan_rsp_dict = json.loads(CodeParser.parse_code(block=None, text=plan_rsp))
self.commands.extend(plan_rsp_dict)
self.rc.memory.add(Message(content=plan_rsp, role="assistant"))
# routing commands
route_prompt = ROUTING_CMD_PROMPT.format(
plan_status=plan_status,
team_info=team_info,
example=ROUTING_EXAMPLE,
available_commands=prepare_command_prompt(self.env_commands),
)
context = self.llm.format_msg(self.get_memory() + [Message(content=route_prompt, role="user")])
print(*context, sep="*" * 10 + "\n\n")
# breakpoint()
route_rsp = await self.llm.aask(context)
route_rsp_dict = json.loads(CodeParser.parse_code(block=None, text=route_rsp))
self.commands.extend(route_rsp_dict)
self.rc.memory.add(Message(content=route_rsp, role="assistant"))
return True
async def _act(self) -> Message:
"""Useful in 'react' mode. Return a Message conforming to Role._act interface."""
self.run_commands(self.commands)
self.task_result = TaskResult(result="Success", is_success=True)
async def run(self, with_message=None) -> Message | None:
if await self._observe():
await self._think()
await self._act()

View file

@ -0,0 +1,66 @@
from enum import Enum
from pydantic import BaseModel
class CommandDef(BaseModel):
name: str
signature: str = ""
desc: str = ""
class Command(Enum):
# commands for planning
APPEND_TASK = CommandDef(
name="append_task",
signature="append_task(task_id: str, dependent_task_ids: list[str], instruction: str, assignee: str)",
desc="Append a new task with task_id (number) to the end of existing task sequences. If dependent_task_ids is not empty, the task will depend on the tasks with the ids in the list.",
)
RESET_TASK = CommandDef(
name="reset_task",
signature="reset_task(task_id: str)",
desc="Reset a task based on task_id, i.e. set Task.is_finished=False and request redo. This also resets all tasks depending on it.",
)
REPLACE_TASK = CommandDef(
name="replace_task",
signature="replace_task(task_id: str, new_dependent_task_ids: list[str], new_instruction: str, new_assignee: str)",
desc="Replace an existing task (can be current task) based on task_id, and reset all tasks depending on it.",
)
FINISH_CURRENT_TASK = CommandDef(
name="finish_current_task",
signature="finish_current_task()",
desc="Finishes current task, set Task.is_finished=True, set current task to next task",
)
# commands for env interaction
PUBLISH_MESSAGE = CommandDef(
name="publish_message",
signature="publish_message(content: str, send_to: str)",
desc="Publish a message to a team member, use member name to fill send_to args. You may copy the full original content or add additional information from upstream. This will make team members start their work. DONT omit any necessary info such as path, link, environment from original content to team members because you are their sole info source. However, if the original content is long or contains concrete info, you should forward the message using forward_message instead of publishing it.",
)
FORWARD_MESSAGE = CommandDef(
name="forward_message",
signature="forward_message(message_id: str)",
desc="Forward a message from one team member to another or all without any modification. This will make the recipient start their work, too.",
)
REPLY_TO_HUMAN = CommandDef(
name="reply_to_human",
signature="reply_to_human(content: str)",
desc="Reply to human user with the content provided. Use this when you have a clear answer or solution to the user's question.",
)
ASK_HUMAN = CommandDef(
name="ask_human",
signature="ask_human(question: str)",
desc="Use this when you fail the current task or if you are unsure of the situation encountered. Your response should contain a brief summary of your situation, ended with a clear and concise question.",
)
# common commands
PASS = CommandDef(
name="pass",
signature="pass",
desc="Pass and do nothing, if you don't think the plan needs to be updated nor a message to be published or forwarded. The reasons can be the latest message is unnecessary or obsolete, or you want to wait for more information before making a move.",
)
@property
def cmd_name(self):
return self.value.name