fix conflict

This commit is contained in:
seehi 2024-04-26 21:47:43 +08:00
commit 0c29c298c2
21 changed files with 775 additions and 29 deletions

View file

@ -13,6 +13,37 @@ llm:
# - gpt-4 8k: "gpt-4"
# See for more: https://azure.microsoft.com/en-us/pricing/details/cognitive-services/openai-service/
# Role's custom configuration
roles:
- role: "ProductManager" # role's className or role's role_id
llm:
api_type: "openai" # or azure / ollama / open_llm etc. Check LLMType for more options
base_url: "YOUR_BASE_URL"
api_key: "YOUR_API_KEY"
proxy: "YOUR_PROXY" # for LLM API requests
model: "gpt-4-turbo-1106"
- role: "Architect"
llm:
api_type: "openai" # or azure / ollama / open_llm etc. Check LLMType for more options
base_url: "YOUR_BASE_URL"
api_key: "YOUR_API_KEY"
proxy: "YOUR_PROXY" # for LLM API requests
model: "gpt-35-turbo"
- role: "ProjectManager"
llm:
api_type: "azure"
base_url: "YOUR_BASE_URL"
api_key: "YOUR_API_KEY"
api_version: "YOUR_API_VERSION"
model: "gpt-4-1106"
- role: "Engineer"
llm:
api_type: "azure"
base_url: "YOUR_BASE_URL"
api_key: "YOUR_API_KEY"
api_version: "YOUR_API_VERSION"
model: "gpt-35-turbo-1106"
repair_llm_output: true # when the output is not a valid json, try to repair it
proxy: "YOUR_PROXY" # for tools like requests, playwright, selenium, etc.

View file

@ -3,7 +3,7 @@ from __future__ import annotations
from typing import Tuple
from metagpt.actions import Action
from metagpt.logs import logger
from metagpt.logs import get_human_input, logger
from metagpt.schema import Message, Plan
@ -50,7 +50,7 @@ class AskReview(Action):
"Please type your review below:\n"
)
rsp = input(prompt)
rsp = await get_human_input(prompt)
if rsp.lower() in ReviewConst.EXIT_WORDS:
exit()

View file

@ -0,0 +1,5 @@
from metagpt.actions import Action
class RunCommand(Action):
"""A dummy RunCommand action used as a symbol only"""

View file

@ -170,7 +170,7 @@ class WriteCode(Action):
if not task_doc.content:
task_doc = project_repo.docs.task.get(filename=task_doc.filename)
m = json.loads(task_doc.content)
code_filenames = m.get(TASK_LIST.key, []) if use_inc else m.get(REFINED_TASK_LIST.key, [])
code_filenames = m.get(TASK_LIST.key, []) if not use_inc else m.get(REFINED_TASK_LIST.key, [])
codes = []
src_file_repo = project_repo.srcs

View file

@ -15,6 +15,7 @@ from metagpt.configs.browser_config import BrowserConfig
from metagpt.configs.llm_config import LLMConfig, LLMType
from metagpt.configs.mermaid_config import MermaidConfig
from metagpt.configs.redis_config import RedisConfig
from metagpt.configs.role_custom_config import RoleCustomConfig
from metagpt.configs.s3_config import S3Config
from metagpt.configs.search_config import SearchConfig
from metagpt.configs.workspace_config import WorkspaceConfig
@ -76,6 +77,9 @@ class Config(CLIParams, YamlModel):
azure_tts_subscription_key: str = ""
azure_tts_region: str = ""
# Role's custom configuration
roles: Optional[List[RoleCustomConfig]] = None
@classmethod
def from_home(cls, path):
"""Load config from ~/.metagpt/config2.yaml"""

View file

@ -0,0 +1,19 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2024/4/22 16:33
@Author : Justin
@File : role_custom_config.py
"""
from metagpt.configs.llm_config import LLMConfig
from metagpt.utils.yaml_model import YamlModel
class RoleCustomConfig(YamlModel):
"""custom config for roles
role: role's className or role's role_id
To be expanded
"""
role: str = ""
llm: LLMConfig

View file

@ -19,6 +19,7 @@ from metagpt.environment.api.env_api import (
)
from metagpt.environment.base_env_space import BaseEnvAction, BaseEnvObsParams
from metagpt.logs import logger
from metagpt.memory import Memory
from metagpt.schema import Message
from metagpt.utils.common import get_function_schema, is_coroutine_func, is_send_to
@ -131,7 +132,7 @@ class Environment(ExtEnv):
desc: str = Field(default="") # 环境描述
roles: dict[str, SerializeAsAny["Role"]] = Field(default_factory=dict, validate_default=True)
member_addrs: Dict["Role", Set] = Field(default_factory=dict, exclude=True)
history: str = "" # For debug
history: Memory = Field(default_factory=Memory) # For debug
context: Context = Field(default_factory=Context, exclude=True)
def reset(
@ -190,7 +191,7 @@ class Environment(ExtEnv):
found = True
if not found:
logger.warning(f"Message no recipients: {message.dump()}")
self.history += f"\n{message}" # For debug
self.history.add(message) # For debug
return True

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :

View file

@ -0,0 +1,95 @@
from metagpt.actions import (
UserRequirement,
WriteDesign,
WritePRD,
WriteTasks,
WriteTest,
)
from metagpt.actions.summarize_code import SummarizeCode
from metagpt.environment.base_env import Environment
from metagpt.logs import get_human_input
from metagpt.roles import (
Architect,
Engineer,
ProductManager,
ProjectManager,
QaEngineer,
Role,
)
from metagpt.schema import Message
from metagpt.utils.common import any_to_str, any_to_str_set
class MGXEnv(Environment):
"""MGX Environment"""
def _publish_message(self, message: Message, peekable: bool = True) -> bool:
return super().publish_message(message, peekable)
def publish_message(self, message: Message, user_defined_recipient: str = "", publicer: str = "") -> bool:
"""let the team leader take over message publishing"""
tl = self.get_role("Team Leader")
if user_defined_recipient:
self._publish_message(message)
# bypass team leader, team leader only needs to know but not to react
tl.rc.memory.add(self.move_message_info_to_content(message))
elif self.message_within_software_sop(message) and not self.has_user_requirement():
# Quick routing for messages within software SOP, bypassing TL.
# Use rules to check for user intervention and to finish task.
# NOTE: This escapes TL's supervision and has pitfalls such as routing obsolete messages even if TL has acquired a new user requirement.
# In addition, we should not determine the status of a task based on message cause_by.
# Consider replacing this in the future.
self._publish_message(message)
if self.is_software_task_finished(message):
tl.rc.memory.add(self.move_message_info_to_content(message))
tl.finish_current_task()
elif publicer == tl.profile:
# message processed by team leader can be published now
self._publish_message(message)
else:
# every regular message goes through team leader
message = self.move_message_info_to_content(message)
message.send_to.add(tl.name)
tl.put_message(message)
self.history.add(message)
return True
async def ask_human(self, question: str, sent_from: Role = None) -> str:
# NOTE: Can be overwritten in remote setting
return await get_human_input(question)
async def reply_to_human(self, content: str, sent_from: Role = None) -> str:
# NOTE: Can be overwritten in remote setting
return content
def message_within_software_sop(self, message: Message) -> bool:
return message.sent_from in any_to_str_set([ProductManager, Architect, ProjectManager, Engineer, QaEngineer])
def has_user_requirement(self, k=2) -> bool:
"""A heuristics to check if there is a recent user intervention"""
return any_to_str(UserRequirement) in [msg.cause_by for msg in self.history.get(k)]
def is_software_task_finished(self, message: Message) -> bool:
"""Use a hard-coded rule to check if one software task is finished"""
return message.cause_by in any_to_str_set([WritePRD, WriteDesign, WriteTasks, SummarizeCode]) or (
message.cause_by == any_to_str(WriteTest) and "Exceeding" in message.content
)
def move_message_info_to_content(self, message: Message) -> Message:
"""Two things here:
1. Convert role, since role field must be reserved for LLM API, and is limited to, for example, one of ["user", "assistant", "system"]
2. Add sender and recipient info to content, making TL aware, since LLM API only takes content as input
"""
if message.role in ["system", "user", "assistant"]:
sent_from = message.sent_from
else:
sent_from = message.role
message.role = "assistant"
message.content = f"from {sent_from} to {message.send_to}: {message.content}"
return message

View file

@ -9,6 +9,7 @@
from __future__ import annotations
import asyncio
import inspect
import sys
from contextvars import ContextVar
from datetime import datetime
@ -77,6 +78,14 @@ async def log_tool_output_async(output: ToolLogItem | list[ToolLogItem], tool_na
await _tool_output_log_async(output=output, tool_name=tool_name)
async def get_human_input(prompt: str = ""):
"""interface for getting human input, can be set to get input from different sources with set_human_input_func"""
if inspect.iscoroutinefunction(_get_human_input):
return await _get_human_input(prompt)
else:
return _get_human_input(prompt)
def set_llm_stream_logfunc(func):
global _llm_stream_log
_llm_stream_log = func
@ -93,6 +102,11 @@ async def set_tool_output_logfunc_async(func):
_tool_output_log_async = func
def set_human_input_func(func):
global _get_human_input
_get_human_input = func
_llm_stream_log = partial(print, end="")
@ -124,3 +138,5 @@ def get_llm_stream_queue():
The asyncio.Queue instance if set, otherwise None.
"""
return LLM_STREAM_QUEUE.get(None)
_get_human_input = input # get human input from console by default

View file

@ -0,0 +1,70 @@
from metagpt.strategy.thinking_command import Command
def prepare_command_prompt(commands: list[Command]) -> str:
command_prompt = ""
for i, command in enumerate(commands):
command_prompt += f"{i+1}. {command.value.signature}:\n{command.value.desc}\n\n"
return command_prompt
CMD_PROMPT = """
# Data Structure
class Task(BaseModel):
task_id: str = ""
dependent_task_ids: list[str] = []
instruction: str = ""
task_type: str = ""
assignee: str = ""
# Team Member Info
{team_info}
# Available Commands
{available_commands}
# Current Plan
{plan_status}
# Example
{example}
# Instructions
You are a team leader, and you are responsible for drafting tasks and routing tasks to your team members.
You should NOT assign consecutive tasks to the same team member, instead, assign an aggregated task (or the complete requirement) and let the team member to decompose it.
When creating a new plan involving multiple members, create all tasks at once.
If plan is created, you should track the progress based on team member feedback message, and update plan accordingly, such as finish_current_task, reset_task, replace_task, etc.
You should publish_message to team members, asking them to start their task.
Pay close attention to new user message, review the conversation history, use reply_to_human to respond to the user directly, DON'T ask your team members.
Note:
1. If the requirement is a pure DATA-RELATED requirement, such as bug fixes, issue reporting, environment setup, terminal operations, pip install, web browsing, web scraping, web searching, web imitation, data science, data analysis, machine learning, deep learning, text-to-image etc. DON'T decompose it, assign a single task with the original user requirement as instruction directly to Data Analyst.
2. If the requirement is developing a software, game, app, or website, excluding the above data-related tasks, you should decompose the requirement into multiple tasks and assign them to different team members based on their expertise, usually the sequence of Product Manager -> Architect -> Project Manager -> Engineer -> QaEngine, each assigned ONE task.
3. If the requirement contains both DATA-RELATED part mentioned in 1 and software development part mentioned in 2, you should decompose the software development part and assign them to different team members based on their expertise, and assign the DATA-RELATED part to Data Analyst David directly.
Pay close attention to the Example provided
You may use any of the available commands to create a plan or update the plan. You may output mutiple commands, they will be executed sequentially.
If you finish current task, you will automatically take the next task in the existing plan, use finish_task, DON'T append a new task.
# Your commands in a json array, in the following output format, always output a json array, if there is nothing to do, use the pass command:
Some text indicating your thoughts, such as how you categorize the requirement based on Note (is it 1., 2., or 3.?) or how you should update the plan status. Then a json array of commands.
```json
[
{{
"command_name": str,
"args": {{"arg_name": arg_value, ...}}
}},
...
]
```
"""
FINISH_CURRENT_TASK_CMD = """
```json
[
{
"command_name": "finish_current_task",
"args": {{}}
}
```
"""

View file

@ -14,7 +14,7 @@ from metagpt.roles import Role
from metagpt.schema import Message, Task, TaskResult
from metagpt.strategy.task_type import TaskType
from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender
from metagpt.utils.common import CodeParser, role_raise_decorator
from metagpt.utils.common import CodeParser
REACT_THINK_PROMPT = """
# User Requirement
@ -62,7 +62,7 @@ class DataInterpreter(Role):
async def _think(self) -> bool:
"""Useful in 'react' mode. Use LLM to decide whether and what to do next."""
user_requirement = self.get_memories()[0].content
user_requirement = self.get_memories()[-1].content
context = self.working_memory.get()
if not context:
@ -86,6 +86,7 @@ class DataInterpreter(Role):
return Message(content=code, role="assistant", cause_by=WriteAnalysisCode)
async def _plan_and_act(self) -> Message:
self._set_state(0)
try:
rsp = await super()._plan_and_act()
await self.execute_code.terminate()
@ -153,7 +154,7 @@ class DataInterpreter(Role):
logger.info(f"ready to {todo.name}")
use_reflection = counter > 0 and self.use_reflection # only use reflection after the first trial
user_requirement = self.get_memories()[0].content # issue: 1多次用户交互时永远只读用户的第1次request2prerequisite没处理
user_requirement = self.get_memories()[-1].content
code = await todo.run(
user_requirement=user_requirement,
@ -186,11 +187,3 @@ class DataInterpreter(Role):
print(result)
data_info = DATA_INFO.format(info=result)
self.working_memory.add(Message(content=data_info, role="user", cause_by=CheckData))
@role_raise_decorator
async def run(self, with_message=None) -> Message | None:
if not self.rc.todo:
self.set_actions([WriteAnalysisCode])
self._set_state(0)
return await super().run(with_message)

View file

@ -0,0 +1,131 @@
from __future__ import annotations
import json
from pydantic import model_validator
from metagpt.actions.di.run_command import RunCommand
from metagpt.environment.mgx.mgx_env import MGXEnv
from metagpt.prompts.di.team_leader import (
CMD_PROMPT,
FINISH_CURRENT_TASK_CMD,
prepare_command_prompt,
)
from metagpt.roles import Role
from metagpt.schema import Message, Task, TaskResult
from metagpt.strategy.experience_retriever import SimpleExpRetriever
from metagpt.strategy.planner import Planner
from metagpt.strategy.thinking_command import Command
from metagpt.utils.common import CodeParser
class TeamLeader(Role):
name: str = "Tim"
profile: str = "Team Leader"
task_result: TaskResult = None
commands: list[Command] = [
Command.APPEND_TASK,
Command.RESET_TASK,
Command.REPLACE_TASK,
Command.FINISH_CURRENT_TASK,
Command.PUBLISH_MESSAGE,
Command.ASK_HUMAN,
Command.REPLY_TO_HUMAN,
Command.PASS,
]
@model_validator(mode="after")
def set_plan(self) -> "TeamLeader":
self.planner = Planner(goal=self.goal, working_memory=self.rc.working_memory, auto_run=True)
return self
def _run_env_command(self, cmd):
assert isinstance(self.rc.env, MGXEnv), "TeamLeader should only be used in an MGXEnv"
if cmd["command_name"] == Command.PUBLISH_MESSAGE.cmd_name:
self.publish_message(Message(**cmd["args"]))
elif cmd["command_name"] == Command.ASK_HUMAN.cmd_name:
self.rc.env.ask_human(sent_from=self, **cmd["args"])
elif cmd["command_name"] == Command.REPLY_TO_HUMAN.cmd_name:
self.rc.env.reply_to_human(sent_from=self, **cmd["args"])
def _run_internal_command(self, cmd):
if cmd["command_name"] == Command.APPEND_TASK.cmd_name:
self.planner.plan.append_task(Task(**cmd["args"]))
elif cmd["command_name"] == Command.RESET_TASK.cmd_name:
self.planner.plan.reset_task(**cmd["args"])
elif cmd["command_name"] == Command.REPLACE_TASK.cmd_name:
self.planner.plan.replace_task(Task(**cmd["args"]))
elif cmd["command_name"] == Command.FINISH_CURRENT_TASK.cmd_name:
self.planner.plan.current_task.update_task_result(task_result=self.task_result)
self.planner.plan.finish_current_task()
self.rc.working_memory.clear()
def run_commands(self, cmds):
print(*cmds, sep="\n")
for cmd in cmds:
self._run_env_command(cmd)
self._run_internal_command(cmd)
if self.planner.plan.is_plan_finished():
self._set_state(-1)
def get_memory(self, k=10) -> list[Message]:
"""A wrapper with default value"""
return self.rc.memory.get(k=k)
async def _think(self) -> bool:
"""Useful in 'react' mode. Use LLM to decide whether and what to do next."""
self.commands = []
if not self.planner.plan.goal:
user_requirement = self.get_memories()[-1].content
self.planner.plan.goal = user_requirement
plan_status = self.planner.plan.model_dump(include=["goal", "tasks"])
for task in plan_status["tasks"]:
task.pop("code")
task.pop("result")
team_info = ""
for role in self.rc.env.roles.values():
if role.profile == "TeamLeader":
continue
team_info += f"{role.name}: {role.profile}, {role.goal}\n"
example = SimpleExpRetriever().retrieve()
prompt = CMD_PROMPT.format(
plan_status=plan_status,
team_info=team_info,
example=example,
available_commands=prepare_command_prompt(self.commands),
)
context = self.llm.format_msg(self.get_memory() + [Message(content=prompt, role="user")])
rsp = await self.llm.aask(context)
rsp_dict = json.loads(CodeParser.parse_code(block=None, text=rsp))
self.commands.extend(rsp_dict)
self.rc.memory.add(Message(content=rsp, role="assistant"))
return True
async def _act(self) -> Message:
"""Useful in 'react' mode. Return a Message conforming to Role._act interface."""
self.run_commands(self.commands)
self.task_result = TaskResult(result="Success", is_success=True)
msg = Message(content="Commands executed", send_to="no one") # a dummy message to conform to the interface
self.rc.memory.add(msg)
return msg
def publish_message(self, msg):
"""If the role belongs to env, then the role's messages will be broadcast to env"""
if not msg:
return
if not self.rc.env:
# If env does not exist, do not publish the message
return
msg.sent_from = self.profile
msg.cause_by = RunCommand
self.rc.env.publish_message(msg, publicer=self.profile)
def finish_current_task(self):
self.planner.plan.finish_current_task()
self.rc.memory.add(Message(content=FINISH_CURRENT_TASK_CMD, role="assistant"))

View file

@ -9,7 +9,7 @@
from metagpt.actions import UserRequirement, WritePRD
from metagpt.actions.prepare_documents import PrepareDocuments
from metagpt.roles.role import Role
from metagpt.roles.role import Role, RoleReactMode
from metagpt.utils.common import any_to_name
@ -35,7 +35,8 @@ class ProductManager(Role):
self.set_actions([PrepareDocuments, WritePRD])
self._watch([UserRequirement, PrepareDocuments])
self.todo_action = any_to_name(PrepareDocuments)
self.rc.react_mode = RoleReactMode.BY_ORDER
self.todo_action = any_to_name(WritePRD)
async def _think(self) -> bool:
"""Decide what to do"""

View file

@ -43,7 +43,6 @@ from metagpt.utils.repair_llm_raw_output import extract_state_value_from_output
if TYPE_CHECKING:
from metagpt.environment import Environment # noqa: F401
PREFIX_TEMPLATE = """You are a {profile}, named {name}, your goal is {goal}. """
CONSTRAINT_TEMPLATE = "the constraint is {constraints}. "
@ -455,8 +454,8 @@ class Role(SerializationMixin, ContextMixin, BaseModel):
rsp = Message(content="No actions taken yet", cause_by=Action) # will be overwritten after Role _act
while actions_taken < self.rc.max_react_loop:
# think
await self._think()
if self.rc.todo is None:
has_todo = await self._think()
if not has_todo:
break
# act
logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}")
@ -492,6 +491,8 @@ class Role(SerializationMixin, ContextMixin, BaseModel):
await self.planner.process_task_result(task_result)
rsp = self.planner.get_useful_memories()[0] # return the completed plan as a response
rsp.role = "assistant"
rsp.sent_from = self._setting
self.rc.memory.add(rsp) # add to persistent memory

View file

@ -349,6 +349,7 @@ class Task(BaseModel):
result: str = ""
is_success: bool = False
is_finished: bool = False
assignee: str = ""
def reset(self):
self.code = ""
@ -490,7 +491,11 @@ class Plan(BaseModel):
Returns:
None
"""
assert not self.has_task_id(new_task.task_id), "Task already in current plan, use replace_task instead"
# assert not self.has_task_id(new_task.task_id), "Task already in current plan, use replace_task instead"
if self.has_task_id(new_task.task_id):
logger.warning(
"Task already in current plan, should use replace_task instead. Overwriting the existing task."
)
assert all(
[self.has_task_id(dep_id) for dep_id in new_task.dependent_task_ids]
@ -505,6 +510,10 @@ class Plan(BaseModel):
return task_id in self.task_map
def _update_current_task(self):
self.tasks = self._topological_sort(self.tasks)
# Update the task map for quick access to tasks by ID
self.task_map = {task.task_id: task for task in self.tasks}
current_task_id = ""
for task in self.tasks:
if not task.is_finished:
@ -528,6 +537,10 @@ class Plan(BaseModel):
self.current_task.is_finished = True
self._update_current_task() # set to next task
def is_plan_finished(self) -> bool:
"""Check if all tasks are finished"""
return all(task.is_finished for task in self.tasks)
def get_finished_tasks(self) -> list[Task]:
"""return all finished tasks in correct linearized order

View file

@ -0,0 +1,138 @@
from pydantic import BaseModel
class ExpRetriever(BaseModel):
"""interface for experience retriever"""
def retrieve(self, context: str) -> str:
raise NotImplementedError
class SimpleExpRetriever(ExpRetriever):
"""A simple experience retriever that returns manually crafted examples."""
EXAMPLE: str = """
## example 1
User Requirement: Create a cli snake game
Explanation: The requirement is about software development. Assign each tasks to a different team member based on their expertise.
```json
[
{
"command_name": "append_task",
"args": {
"task_id": "1",
"dependent_task_ids": [],
"instruction": "Create a product requirement document (PRD) outlining the features, user interface, and user experience of the CLI snake game.",
"assignee": "Alice"
}
},
{
"command_name": "append_task",
"args": {
"task_id": "2",
"dependent_task_ids": ["1"],
"instruction": "Design the software architecture for the CLI snake game, including the choice of programming language, libraries, and data flow.",
"assignee": "Bob"
}
},
{
"command_name": "append_task",
"args": {
"task_id": "3",
"dependent_task_ids": ["2"],
"instruction": "Break down the architecture into manageable tasks, identify task dependencies, and prepare a detailed task list for implementation.",
"assignee": "Eve"
}
},
{
"command_name": "append_task",
"args": {
"task_id": "4",
"dependent_task_ids": ["3"],
"instruction": "Implement the core game logic for the CLI snake game, including snake movement, food generation, and score tracking.",
"assignee": "Alex"
}
},
{
"command_name": "append_task",
"args": {
"task_id": "5",
"dependent_task_ids": ["4"],
"instruction": "Write comprehensive tests for the game logic and user interface to ensure functionality and reliability.",
"assignee": "Edward"
}
},
{
"command_name": "publish_message",
"args": {
"content": "User request to create a cli snake game. Please create a product requirement document (PRD) outlining the features, user interface, and user experience of the snake game.",
"send_to": "Alice"
}
}
]
```
## example 2
User Requirement: Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.
Explanation: DON'T decompose requirement if it is a DATA-RELATED task, assign a single task directly to Data Analyst David. He will manage the decomposition and implementation.
```json
[
{
"command_name": "append_task",
"args": {
"task_id": "1",
"dependent_task_ids": [],
"instruction": "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.",
"assignee": "David"
}
},
{
"command_name": "publish_message",
"args": {
"content": "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.",
"send_to": "David"
}
}
]
```
## example 3
Conversation History:
[
...,
{'role': 'assistant', 'content': 'from Alice(Product Manager) to {'Bob'}: {'docs': {'20240424153821.json': {'root_path': 'docs/prd', 'filename': '20240424153821.json', 'content': '{"Language":"en_us","Programming Language":"Python","Original Requirements":"create a cli snake game","Project Name":"snake_game","Product Goals":["Develop an intuitive and addictive snake game",...], ...}}}}},
]
Explanation: You received a message from Alice, the Product Manager, that she has completed the PRD, use finish_current_task to mark her task as finished and moves the plan to the next task. Based on plan status, next task is for Bob (Architect), publish a message asking him to start. The message content should contain important path info.
```json
[
{
"command_name": "finish_current_task",
"args": {}
},
{
"command_name": "publish_message",
"args": {
"content": "Please design the software architecture for the snake game based on the PRD created by Alice. The PRD is at 'docs/prd/20240424153821.json'. Include the choice of programming language, libraries, and data flow, etc.",
"send_to": "Bob"
}
}
]
```
## example 4
User Question: how does the project go?
Explanation: The user is asking for a general update on the project status. Give a straight answer about the current task the team is working on and provide a summary of the completed tasks.
```json
[
{
"command_name": "reply_to_human",
"args": {
"content": "The team is currently working on ... We have completed ...",
}
}
]
```
"""
def retrieve(self, context: str = "") -> str:
return self.EXAMPLE

View file

@ -0,0 +1,61 @@
from enum import Enum
from pydantic import BaseModel
class CommandDef(BaseModel):
name: str
signature: str = ""
desc: str = ""
class Command(Enum):
# commands for planning
APPEND_TASK = CommandDef(
name="append_task",
signature="append_task(task_id: str, dependent_task_ids: list[str], instruction: str, assignee: str)",
desc="Append a new task with task_id (number) to the end of existing task sequences. If dependent_task_ids is not empty, the task will depend on the tasks with the ids in the list.",
)
RESET_TASK = CommandDef(
name="reset_task",
signature="reset_task(task_id: str)",
desc="Reset a task based on task_id, i.e. set Task.is_finished=False and request redo. This also resets all tasks depending on it.",
)
REPLACE_TASK = CommandDef(
name="replace_task",
signature="replace_task(task_id: str, new_dependent_task_ids: list[str], new_instruction: str, new_assignee: str)",
desc="Replace an existing task (can be current task) based on task_id, and reset all tasks depending on it.",
)
FINISH_CURRENT_TASK = CommandDef(
name="finish_current_task",
signature="finish_current_task()",
desc="Finishes current task, set Task.is_finished=True, set current task to next task",
)
# commands for env interaction
PUBLISH_MESSAGE = CommandDef(
name="publish_message",
signature="publish_message(content: str, send_to: str)",
desc="Publish a message to a team member, use member name to fill send_to args. You may copy the full original content or add additional information from upstream. This will make team members start their work. DONT omit any necessary info such as path, link, environment from original content to team members because you are their sole info source.",
)
REPLY_TO_HUMAN = CommandDef(
name="reply_to_human",
signature="reply_to_human(content: str)",
desc="Reply to human user with the content provided. Use this when you have a clear answer or solution to the user's question.",
)
ASK_HUMAN = CommandDef(
name="ask_human",
signature="ask_human(question: str)",
desc="Use this when you fail the current task or if you are unsure of the situation encountered. Your response should contain a brief summary of your situation, ended with a clear and concise question.",
)
# common commands
PASS = CommandDef(
name="pass",
signature="pass",
desc="Pass and do nothing, if you don't think the plan needs to be updated nor a message to be published or forwarded. The reasons can be the latest message is unnecessary or obsolete, or you want to wait for more information before making a move.",
)
@property
def cmd_name(self):
return self.value.name

View file

@ -6,7 +6,7 @@ from metagpt.actions.di.ask_review import AskReview
@pytest.mark.asyncio
async def test_ask_review(mocker):
mock_review_input = "confirm"
mocker.patch("builtins.input", return_value=mock_review_input)
mocker.patch("metagpt.actions.di.ask_review.get_human_input", return_value=mock_review_input)
rsp, confirmed = await AskReview().run()
assert rsp == mock_review_input
assert confirmed

View file

@ -0,0 +1,169 @@
import pytest
from metagpt.environment.mgx.mgx_env import MGXEnv
from metagpt.roles import (
Architect,
Engineer,
ProductManager,
ProjectManager,
QaEngineer,
)
from metagpt.roles.di.data_interpreter import DataInterpreter
from metagpt.roles.di.team_leader import TeamLeader
from metagpt.schema import Message
@pytest.fixture
def env():
test_env = MGXEnv()
tl = TeamLeader()
da = DataInterpreter(
name="David",
profile="Data Analyst",
goal="Take on any data-related tasks, such as data analysis, machine learning, deep learning, web browsing, web scraping, web searching, web deployment, terminal operation, git operation, etc.",
react_mode="react",
)
test_env.add_roles(
[
tl,
ProductManager(),
Architect(),
ProjectManager(),
Engineer(n_borg=5, use_code_review=True),
QaEngineer(),
da,
]
)
return test_env
@pytest.mark.asyncio
async def test_plan_for_software_requirement(env):
requirement = "create a 2048 game"
tl = env.get_role("Team Leader")
env.publish_message(Message(content=requirement, send_to=tl.name))
await tl.run()
# TL should assign tasks to 5 members first, then send message to the first assignee, 6 commands in total
assert len(tl.commands) == 6
plan_cmd = tl.commands[:5]
route_cmd = tl.commands[5]
task_assignment = [task["args"]["assignee"] for task in plan_cmd]
assert task_assignment == [
ProductManager().name,
Architect().name,
ProjectManager().name,
Engineer().name,
QaEngineer().name,
]
assert route_cmd["command_name"] == "publish_message"
assert route_cmd["args"]["send_to"] == ProductManager().name
@pytest.mark.asyncio
async def test_plan_for_data_related_requirement(env):
requirement = "I want to use yolov5 for target detection, yolov5 all the information from the following link, please help me according to the content of the link (https://github.com/ultralytics/yolov5), set up the environment and download the model parameters, and finally provide a few pictures for inference, the inference results will be saved!"
tl = env.get_role("Team Leader")
env.publish_message(Message(content=requirement, send_to=tl.name))
await tl.run()
# TL should assign 1 task to Data Analyst and send message to it
assert len(tl.commands) == 2
plan_cmd = tl.commands[0]
route_cmd = tl.commands[-1]
da = env.get_role("Data Analyst")
assert plan_cmd["command_name"] == "append_task"
assert plan_cmd["args"]["assignee"] == da.name
assert route_cmd["command_name"] == "publish_message"
assert "https://github.com" in route_cmd["args"]["content"] # necessary info must be in the message
assert route_cmd["args"]["send_to"] == da.name
@pytest.mark.asyncio
async def test_plan_for_mixed_requirement(env):
requirement = "Search the web for the new game 2048X, then replicate it"
tl = env.get_role("Team Leader")
env.publish_message(Message(content=requirement, send_to=tl.name))
await tl.run()
# TL should assign 6 tasks, first to Data Analyst to search the web, following by the software team sequence
# TL should send message to Data Analyst after task assignment
assert len(tl.commands) == 7
plan_cmd = tl.commands[:6]
route_cmd = tl.commands[-1]
task_assignment = [task["args"]["assignee"] for task in plan_cmd]
da = env.get_role("Data Analyst")
assert task_assignment == [
da.name,
ProductManager().name,
Architect().name,
ProjectManager().name,
Engineer().name,
QaEngineer().name,
]
assert route_cmd["command_name"] == "publish_message"
assert route_cmd["args"]["send_to"] == da.name
PRD_MSG_CONTENT = """{'docs': {'20240424153821.json': {'root_path': 'docs/prd', 'filename': '20240424153821.json', 'content': '{"Language":"en_us","Programming Language":"Python","Original Requirements":"create a 2048 game","Project Name":"game_2048","Product Goals":["Develop an intuitive and addictive 2048 game variant","Ensure the game is accessible and performs well on various devices","Design a visually appealing and modern user interface"],"User Stories":["As a player, I want to be able to undo my last move so I can correct mistakes","As a player, I want to see my high scores to track my progress over time","As a player, I want to be able to play the game without any internet connection"],"Competitive Analysis":["2048 Original: Classic gameplay, minimalistic design, lacks social sharing features","2048 Hex: Unique hexagon board, but not mobile-friendly","2048 Multiplayer: Offers real-time competition, but overwhelming ads","2048 Bricks: Innovative gameplay with bricks, but poor performance on older devices","2048.io: Multiplayer battle royale mode, but complicated UI for new players","2048 Animated: Animated tiles add fun, but the game consumes a lot of battery","2048 3D: 3D version of the game, but has a steep learning curve"],"Competitive Quadrant Chart":"quadrantChart\\n title \\"User Experience and Feature Set of 2048 Games\\"\\n x-axis \\"Basic Features\\" --> \\"Rich Features\\"\\n y-axis \\"Poor Experience\\" --> \\"Great Experience\\"\\n quadrant-1 \\"Need Improvement\\"\\n quadrant-2 \\"Feature-Rich but Complex\\"\\n quadrant-3 \\"Simplicity with Poor UX\\"\\n quadrant-4 \\"Balanced\\"\\n \\"2048 Original\\": [0.2, 0.7]\\n \\"2048 Hex\\": [0.3, 0.4]\\n \\"2048 Multiplayer\\": [0.6, 0.5]\\n \\"2048 Bricks\\": [0.4, 0.3]\\n \\"2048.io\\": [0.7, 0.4]\\n \\"2048 Animated\\": [0.5, 0.6]\\n \\"2048 3D\\": [0.6, 0.3]\\n \\"Our Target Product\\": [0.8, 0.9]","Requirement Analysis":"The game must be engaging and retain players, which requires a balance of simplicity and challenge. Accessibility on various devices is crucial for a wider reach. A modern UI is needed to attract and retain the modern user. The ability to play offline is important for users on the go. High score tracking and the ability to undo moves are features that will enhance user experience.","Requirement Pool":[["P0","Implement core 2048 gameplay mechanics"],["P0","Design responsive UI for multiple devices"],["P1","Develop undo move feature"],["P1","Integrate high score tracking system"],["P2","Enable offline gameplay capability"]],"UI Design draft":"The UI will feature a clean and modern design with a minimalist color scheme. The game board will be center-aligned with smooth tile animations. Score and high score will be displayed at the top. Undo and restart buttons will be easily accessible. The design will be responsive to fit various screen sizes.","Anything UNCLEAR":"The monetization strategy for the game is not specified. Further clarification is needed on whether the game should include advertisements, in-app purchases, or be completely free."}'}}}"""
DESIGN_CONTENT = """{"docs":{"20240424214432.json":{"root_path":"docs/system_design","filename":"20240424214432.json","content":"{\\"Implementation approach\\":\\"We will develop the 2048 game using Python, leveraging the pygame library for rendering the game interface and handling user input. This library is suitable for creating games and is widely used in the open-source community. We will ensure that the game logic is separated from the UI code to maintain a clean architecture. The game will be designed to be responsive and accessible on both desktop and mobile devices using scalable dimensions and touch-friendly controls.\\",\\"File list\\":[\\"main.py\\",\\"game.py\\",\\"ui.py\\",\\"constants.py\\",\\"logic.py\\"],\\"Data structures and interfaces\\":\\"\\\\nclassDiagram\\\\n class Main {\\\\n +main() void\\\\n }\\\\n class Game {\\\\n -UI ui\\\\n -Logic logic\\\\n +start_game() void\\\\n +restart_game() void\\\\n }\\\\n class UI {\\\\n -current_score int\\\\n -high_score int\\\\n +draw_board(board: list) void\\\\n +update_score(score: int) void\\\\n +show_game_over() void\\\\n }\\\\n class Logic {\\\\n -board list\\\\n -score int\\\\n +move(direction: str) bool\\\\n +check_game_over() bool\\\\n +get_current_score() int\\\\n +get_high_score() int\\\\n +reset_game() void\\\\n }\\\\n class Constants {\\\\n +BOARD_SIZE int\\\\n +INITIAL_TILES int\\\\n }\\\\n Main --> Game\\\\n Game --> UI\\\\n Game --> Logic\\\\n\\",\\"Program call flow\\":\\"\\\\nsequenceDiagram\\\\n participant M as Main\\\\n participant G as Game\\\\n participant UI as UI\\\\n participant L as Logic\\\\n M->>G: start_game()\\\\n loop Game Loop\\\\n G->>UI: draw_board(board)\\\\n G->>L: move(direction)\\\\n alt if move successful\\\\n L-->>G: return true\\\\n G->>UI: update_score(score)\\\\n else if move not successful\\\\n L-->>G: return false\\\\n end\\\\n G->>L: check_game_over()\\\\n alt if game over\\\\n L-->>G: return true\\\\n G->>UI: show_game_over()\\\\n G->>G: restart_game()\\\\n else\\\\n L-->>G: return false\\\\n end\\\\n end\\\\n\\",\\"Anything UNCLEAR\\":\\"Clarification needed on the specific touch-friendly controls for mobile devices and how they will be implemented using pygame.\\"}"}}}"""
@pytest.mark.asyncio
async def test_plan_update_and_routing(env):
requirement = "create a 2048 game"
tl = env.get_role("Team Leader")
env.publish_message(Message(content=requirement))
await tl.run()
# Assuming Product Manager finishes its task
env.publish_message(Message(content=PRD_MSG_CONTENT, role="Alice(Product Manager)", sent_from="Alice"))
await tl.run()
# TL should mark current task as finished, and forward Product Manager's message to Architect
# Current task should be updated to the second task
plan_cmd = tl.commands[0]
route_cmd = tl.commands[-1]
assert plan_cmd["command_name"] == "finish_current_task"
assert route_cmd["command_name"] == "publish_message"
assert route_cmd["args"]["send_to"] == Architect().name
assert tl.planner.plan.current_task_id == "2"
# Next step, assuming Architect finishes its task
env.publish_message(Message(content=DESIGN_CONTENT, role="Bob(Architect)", sent_from="Bob"))
await tl.run()
plan_cmd = tl.commands[0]
route_cmd = tl.commands[-1]
assert plan_cmd["command_name"] == "finish_current_task"
assert route_cmd["command_name"] == "publish_message"
assert route_cmd["args"]["send_to"] == ProjectManager().name
assert tl.planner.plan.current_task_id == "3"
@pytest.mark.asyncio
async def test_reply_to_human(env):
requirement = "create a 2048 game"
tl = env.get_role("Team Leader")
env.publish_message(Message(content=requirement))
await tl.run()
# Assuming Product Manager finishes its task
env.publish_message(Message(content=PRD_MSG_CONTENT, role="Alice(Product Manager)", sent_from="Alice"))
await tl.run()
# Human inquires about the progress
env.publish_message(Message(content="Who is working? How does the project go?"))
await tl.run()
assert tl.commands[0]["command_name"] == "reply_to_human"

View file

@ -10,7 +10,6 @@ import json
import pytest
from metagpt.actions import WritePRD
from metagpt.actions.prepare_documents import PrepareDocuments
from metagpt.const import REQUIREMENT_FILENAME
from metagpt.context import Context
from metagpt.logs import logger
@ -30,12 +29,8 @@ async def test_product_manager(new_filename):
rsp = await product_manager.run(MockMessages.req)
assert context.git_repo
assert context.repo
assert rsp.cause_by == any_to_str(PrepareDocuments)
assert REQUIREMENT_FILENAME in context.repo.docs.changed_files
# write prd
rsp = await product_manager.run(rsp)
assert rsp.cause_by == any_to_str(WritePRD)
assert REQUIREMENT_FILENAME in context.repo.docs.changed_files
logger.info(rsp)
assert len(rsp.content) > 0
doc = list(rsp.instruct_content.docs.values())[0]