abstract role zero from engineer2, config engineer2

This commit is contained in:
garylin2099 2024-06-03 22:45:27 +08:00
parent 8df76ce612
commit 9a9d342bbb
3 changed files with 211 additions and 169 deletions

View file

@ -5,11 +5,11 @@ class Task(BaseModel):
dependent_task_ids: list[str] = []
instruction: str = ""
task_type: str = ""
assignee: str = "David"
assignee: str = ""
# Available Commands
{available_commands}
Special Command: Use {{"command_name": "Common.pass"}} to do nothing and {{"command_name": "Common.end"}} to indicate completion of all requirements and the end of actions.
Special Command: Use {{"command_name": "pass"}} to do nothing and {{"command_name": "end"}} to indicate completion of all requirements and the end of actions.
# Current Plan
{plan_status}
@ -35,8 +35,8 @@ Pay close attention to the Example provided, you can reuse the example for your
You may use any of the available commands to create a plan or update the plan. You may output mutiple commands, they will be executed sequentially.
If you finish current task, you will automatically take the next task in the existing plan, use Plan.finish_task, DON'T append a new task.
# Your commands in a json array, in the following output format, always output ONE and ONLY ONE json array, if there is nothing to do, use the pass command:
Some text indicating your thoughts, such as how you should update the plan status, respond to inquiry, or seek for help. Then a json array of commands.
# Your commands in a json array, in the following output format. If there is nothing to do, use the pass or end command:
Some text indicating your thoughts, such as how you should update the plan status, respond to inquiry, or seek for help. Then a json array of commands. You must output ONE and ONLY ONE json array. DON'T output multiple json arrays with thoughts between them.
```json
[
{{

View file

@ -1,188 +1,40 @@
from __future__ import annotations
import asyncio
import json
import traceback
from typing import Literal
from pydantic import model_validator
from metagpt.actions import Action
from metagpt.actions.di.write_analysis_code import WriteAnalysisCode
from metagpt.logs import logger
from metagpt.prompts.di.engineer2 import CMD_PROMPT
from metagpt.roles.di.data_interpreter import DataInterpreter
from metagpt.schema import Message, TaskResult
from metagpt.strategy.experience_retriever import KeywordExpRetriever
from metagpt.strategy.planner import Planner
from metagpt.roles.di.role_zero import RoleZero
from metagpt.tools.libs.editor import Editor
from metagpt.tools.tool_recommend import BM25ToolRecommender
from metagpt.utils.common import CodeParser
from test3 import design_doc_2048, design_doc_snake, task_doc_2048, task_doc_snake
class Engineer2(DataInterpreter):
def dummy_func(**kwargs):
pass
class Engineer2(RoleZero):
name: str = "Alex"
profile: str = "Engineer"
goal: str = ""
react_mode: Literal["react"] = "react"
max_react_loop: int = 20 # used for react mode
# task_result: TaskResult = None
command_rsp: str = "" # the raw string containing the commands
commands: list[dict] = [] # commands to be executed
tools: str = ["Plan", "Editor:write,read,write_content", "MGXEnv:ask_human,reply_to_human"]
editor: Editor = Editor()
@model_validator(mode="after")
def set_plan_and_tool(self) -> "DataInterpreter":
# We force using this parameter for DataAnalyst
assert self.react_mode == "react"
assert self.auto_run
assert self.use_plan
# Roughly the same part as DataInterpreter.set_plan_and_tool
self._set_react_mode(react_mode=self.react_mode, max_react_loop=self.max_react_loop, auto_run=self.auto_run)
if self.tools and not self.tool_recommender:
self.tool_recommender = BM25ToolRecommender(tools=self.tools, force=True)
self.set_actions([WriteAnalysisCode])
self._set_state(0)
# HACK: Init Planner, control it through dynamic thinking; Consider formalizing as a react mode
self.planner = Planner(goal="", working_memory=self.rc.working_memory, auto_run=True)
return self
async def _think(self) -> bool:
"""Useful in 'react' mode. Use LLM to decide whether and what to do next."""
if not self.rc.todo and not self.rc.news:
return False
self._set_state(0)
example = ""
if not self.planner.plan.goal:
self.user_requirement = self.get_memories()[-1].content
self.planner.plan.goal = self.user_requirement
example = KeywordExpRetriever().retrieve(self.user_requirement)
else:
# self.working_memory.add_batch(self.rc.news)
self.rc.memory.add_batch(self.rc.news)
# TODO: implement experience retrieval in multi-round setting
# if self.planner.plan.current_task:
# experience = KeywordExpRetriever().retrieve(self.planner.plan.current_task.instruction, exp_type="task")
# if experience and experience not in [msg.content for msg in self.rc.memory.get()]:
# exp_msg = Message(content=experience, role="assistant")
# self.rc.memory.add(exp_msg)
# example = KeywordExpRetriever().retrieve(self.planner.plan.current_task.instruction, exp_type="task")
plan_status = self.planner.plan.model_dump(include=["goal", "tasks"])
for task in plan_status["tasks"]:
task.pop("code")
task.pop("result")
task.pop("is_success")
# print(plan_status)
current_task = (
self.planner.plan.current_task.model_dump(exclude=["code", "result", "is_success"])
if self.planner.plan.current_task
else ""
)
tools = await self.tool_recommender.recommend_tools()
tool_info = json.dumps({tool.name: tool.schemas for tool in tools})
prompt = CMD_PROMPT.format(
plan_status=plan_status,
current_task=current_task,
example=example,
# available_commands=prepare_command_prompt(self.available_commands),
available_commands=tool_info,
)
# context = self.llm.format_msg(self.working_memory.get() + [Message(content=prompt, role="user")])
context = self.llm.format_msg(self.rc.memory.get(10) + [Message(content=prompt, role="user")])
print(*context, sep="\n" + "*" * 5 + "\n")
self.command_rsp = await self.llm.aask(context)
# self.rc.working_memory.add(Message(content=rsp, role="assistant"))
self.rc.memory.add(Message(content=self.command_rsp, role="assistant"))
return True
async def _act(self) -> Message:
try:
commands = json.loads(CodeParser.parse_code(block=None, lang="json", text=self.command_rsp))
except Exception as e:
tb = traceback.format_exc()
print(tb)
error_msg = Message(content=str(e), role="user")
self.rc.memory.add(error_msg)
return error_msg
outputs = await self.run_commands(commands)
# self.rc.working_memory.add(Message(content=outputs, role="user"))
self.rc.memory.add(Message(content=outputs, role="user"))
return Message(content="Task completed", role="assistant", sent_from=self._setting, cause_by=WriteAnalysisCode)
async def _react(self) -> Message:
actions_taken = 0
rsp = Message(content="No actions taken yet", cause_by=Action) # will be overwritten after Role _act
while actions_taken < self.rc.max_react_loop:
# NOTE: difference here, keep observing within react
await self._observe()
# think
has_todo = await self._think()
if not has_todo:
break
# act
logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}")
rsp = await self._act()
actions_taken += 1
return rsp # return output from the last action
async def run_commands(self, commands) -> list:
tool_execute_map = {
def set_tool_execution_map(self) -> "RoleZero":
self.tool_execute_map = {
"Plan.append_task": self.planner.plan.append_task,
"Plan.reset_task": self.planner.plan.reset_task,
"Plan.replace_task": self.planner.plan.replace_task,
"Editor.write": self.editor.write,
"Editor.write_content": self.editor.write_content,
"Editor.read": self.editor.read,
"MGXEnv.ask_human": dummy_func,
"MGXEnv.reply_to_human": dummy_func,
}
return self
# print(*commands, sep="\n")
is_success = True
outputs = ["Commands executed."]
for cmd in commands:
if cmd["command_name"] in tool_execute_map:
try:
output = tool_execute_map[cmd["command_name"]](**cmd["args"])
if output:
outputs.append(f"Output for {cmd['command_name']}: {str(output)}")
except Exception as e:
tb = traceback.format_exc()
print(e, tb)
outputs.append(tb)
is_success = False
break # Stop executing if any command fails
outputs = "\n\n".join(outputs)
# Handle finish_current_task and end individually as a last step
for cmd in commands:
if (
is_success
and cmd["command_name"] == "Plan.finish_current_task"
and not self.planner.plan.is_plan_finished()
):
task_result = TaskResult(code=str(commands), result=outputs, is_success=is_success)
self.planner.plan.current_task.update_task_result(task_result=task_result)
self.planner.plan.finish_current_task()
# self.rc.working_memory.clear()
elif cmd["command_name"] == "Common.end":
self._set_state(-1)
return outputs
WINE_REQ = "Run data analysis on sklearn Wine recognition dataset, and train a model to predict wine class (20% as validation), and show validation accuracy."
GAME_REQ_2048 = f"""
Create a 2048 game, follow the design doc and task doc. Write your code under /Users/gary/Files/temp/workspace/2048_game/src.
@ -202,17 +54,23 @@ Design doc:
Task doc:
{design_doc_snake}
"""
GAME_REQ_2048_NO_DOC = """
Create a 2048 game with pygame. Write your code under /Users/gary/Files/temp/workspace/2048_game/src.
Consider what files you will write, break down the requests to multiple tasks and write one file in each task.
After writing all codes, write a code review for the codes, make improvement or adjustment based on the review.
Notice: You MUST implement the full code, don't leave comment without implementation!
"""
GAME_INC_REQ_2048 = """
I found an issue with the 2048 code: when tiles are merged, no new tiles pop up.
Write code review for the codes (game.py, main.py, ui.py) under under /Users/gary/Files/temp/workspace/2048_game_bugs/src.
Then correct any issues you find. You can review all code in one time, and solve issues in one time.
"""
GAME_INC_REQ_SNAKE = """
Based on the design doc at /Users/gary/Files/temp/workspace/snake_game_bugs/docs/20240513200737.json,
Found this issue, TypeError: generate_new_position() missing 1 required positional argument: 'snake_body'
Write code review for the codes (food.py, game.py, main.py, snake.py, ui.py) under under /Users/gary/Files/temp/workspace/snake_game_bugs/src.
Then correct any issues you find. You can read the design doc first, then review all code in one time, and solve issues in one time.
Then correct any issues you find. You can review all code in one time, and solve issues in one time.
"""
if __name__ == "__main__":
engineer2 = Engineer2(tools=["Plan", "Editor:write,read,write_content", "MGXEnv:ask_human,reply_to_human"])
asyncio.run(engineer2.run(GAME_INC_REQ_2048))
engineer2 = Engineer2()
asyncio.run(engineer2.run(GAME_REQ_2048_NO_DOC))

View file

@ -0,0 +1,184 @@
from __future__ import annotations
import inspect
import json
import traceback
from typing import Literal
from pydantic import model_validator
from metagpt.actions import Action
from metagpt.actions.di.run_command import RunCommand
from metagpt.logs import logger
from metagpt.prompts.di.role_zero import CMD_PROMPT
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.strategy.experience_retriever import KeywordExpRetriever
from metagpt.strategy.planner import Planner
from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender
from metagpt.utils.common import CodeParser
class RoleZero(Role):
name: str = "Zero"
profile: str = "RoleZero"
goal: str = ""
system_msg: str = ""
cmd_prompt: str = CMD_PROMPT
react_mode: Literal["react"] = "react"
max_react_loop: int = 20 # used for react mode
user_requirement: str = ""
command_rsp: str = "" # the raw string containing the commands
commands: list[dict] = [] # commands to be executed
memory_k: int = 20 # number of memories (messages) to use as historical context
tools: list[str] = [] # Use special symbol ["<all>"] to indicate use of all registered tools
tool_recommender: ToolRecommender = None
tool_execution_map: dict[str, callable] = {}
@model_validator(mode="after")
def set_plan_and_tool(self) -> "RoleZero":
# We force using this parameter for DataAnalyst
assert self.react_mode == "react"
# Roughly the same part as DataInterpreter.set_plan_and_tool
self._set_react_mode(react_mode=self.react_mode, max_react_loop=self.max_react_loop)
if self.tools and not self.tool_recommender:
self.tool_recommender = BM25ToolRecommender(tools=self.tools, force=True)
self.set_actions([RunCommand])
self._set_state(0)
# HACK: Init Planner, control it through dynamic thinking; Consider formalizing as a react mode
self.planner = Planner(goal="", working_memory=self.rc.working_memory, auto_run=True)
return self
@model_validator(mode="after")
def set_tool_execution_map(self) -> "RoleZero":
raise NotImplementedError
async def _think(self) -> bool:
"""Useful in 'react' mode. Use LLM to decide whether and what to do next."""
if not self.rc.todo and not self.rc.news:
return False
self._set_state(0)
example = ""
if not self.planner.plan.goal:
self.user_requirement = self.get_memories()[-1].content
self.planner.plan.goal = self.user_requirement
example = KeywordExpRetriever().retrieve(self.user_requirement)
else:
self.rc.memory.add_batch(self.rc.news)
# TODO: implement experience retrieval in multi-round setting
# if self.planner.plan.current_task:
# experience = KeywordExpRetriever().retrieve(self.planner.plan.current_task.instruction, exp_type="task")
# if experience and experience not in [msg.content for msg in self.rc.memory.get()]:
# exp_msg = Message(content=experience, role="assistant")
# self.rc.memory.add(exp_msg)
# example = KeywordExpRetriever().retrieve(self.planner.plan.current_task.instruction, exp_type="task")
plan_status = self.planner.plan.model_dump(include=["goal", "tasks"])
for task in plan_status["tasks"]:
task.pop("code")
task.pop("result")
task.pop("is_success")
# print(plan_status)
current_task = (
self.planner.plan.current_task.model_dump(exclude=["code", "result", "is_success"])
if self.planner.plan.current_task
else ""
)
tools = await self.tool_recommender.recommend_tools()
tool_info = json.dumps({tool.name: tool.schemas for tool in tools})
prompt = self.cmd_prompt.format(
plan_status=plan_status,
current_task=current_task,
example=example,
available_commands=tool_info,
)
context = self.llm.format_msg(self.rc.memory.get(self.memory_k) + [Message(content=prompt, role="user")])
print(*context, sep="\n" + "*" * 5 + "\n")
self.command_rsp = await self.llm.aask(context)
self.rc.memory.add(Message(content=self.command_rsp, role="assistant"))
return True
async def _act(self) -> Message:
try:
commands = json.loads(CodeParser.parse_code(block=None, lang="json", text=self.command_rsp))
except Exception as e:
tb = traceback.format_exc()
print(tb)
error_msg = Message(content=str(e), role="user")
self.rc.memory.add(error_msg)
return error_msg
outputs = await self._run_commands(commands)
self.rc.memory.add(Message(content=outputs, role="user"))
return Message(content="Task completed", role="assistant", sent_from=self._setting, cause_by=RunCommand)
async def _react(self) -> Message:
actions_taken = 0
rsp = Message(content="No actions taken yet", cause_by=Action) # will be overwritten after Role _act
while actions_taken < self.rc.max_react_loop:
# NOTE: difference here, keep observing within react
await self._observe()
# think
has_todo = await self._think()
if not has_todo:
break
# act
logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}")
rsp = await self._act()
actions_taken += 1
return rsp # return output from the last action
async def _run_commands(self, commands) -> list:
outputs = []
for cmd in commands:
# handle special command first
if await self._run_special_command(cmd):
continue
# run command as specified by tool_execute_map
if cmd["command_name"] in self.tool_execute_map:
tool_obj = self.tool_execute_map[cmd["command_name"]]
output = f"Command {cmd['command_name']} executed"
try:
if inspect.iscoroutinefunction(tool_obj):
tool_output = await tool_obj(**cmd["args"])
else:
tool_output = tool_obj(**cmd["args"])
if tool_output:
output += f": {str(tool_output)}"
outputs.append(output)
except Exception as e:
tb = traceback.format_exc()
print(e, tb)
outputs.append(output + f": {tb}")
break # Stop executing if any command fails
else:
outputs.append(f"Command {cmd['command_name']} not found.")
break
outputs = "\n\n".join(outputs)
return outputs
async def _run_special_command(self, cmd) -> bool:
"""command requiring special check or parsing"""
is_special_cmd = cmd["command_name"] in ["Plan.finish_current_task", "Common.end"]
if cmd["command_name"] == "Plan.finish_current_task" and not self.planner.plan.is_plan_finished():
# task_result = TaskResult(code=str(commands), result=outputs, is_success=is_success)
# self.planner.plan.current_task.update_task_result(task_result=task_result)
self.planner.plan.finish_current_task()
elif cmd["command_name"] == "end":
self._set_state(-1)
return is_special_cmd