From 9a9d342bbb66847bf647db1d50650ffce77ec7ba Mon Sep 17 00:00:00 2001 From: garylin2099 Date: Mon, 3 Jun 2024 22:45:27 +0800 Subject: [PATCH] abstract role zero from engineer2, config engineer2 --- .../prompts/di/{engineer2.py => role_zero.py} | 8 +- metagpt/roles/di/engineer2.py | 188 +++--------------- metagpt/roles/di/role_zero.py | 184 +++++++++++++++++ 3 files changed, 211 insertions(+), 169 deletions(-) rename metagpt/prompts/di/{engineer2.py => role_zero.py} (82%) create mode 100644 metagpt/roles/di/role_zero.py diff --git a/metagpt/prompts/di/engineer2.py b/metagpt/prompts/di/role_zero.py similarity index 82% rename from metagpt/prompts/di/engineer2.py rename to metagpt/prompts/di/role_zero.py index 656b48eef..4fececf7f 100644 --- a/metagpt/prompts/di/engineer2.py +++ b/metagpt/prompts/di/role_zero.py @@ -5,11 +5,11 @@ class Task(BaseModel): dependent_task_ids: list[str] = [] instruction: str = "" task_type: str = "" - assignee: str = "David" + assignee: str = "" # Available Commands {available_commands} -Special Command: Use {{"command_name": "Common.pass"}} to do nothing and {{"command_name": "Common.end"}} to indicate completion of all requirements and the end of actions. +Special Command: Use {{"command_name": "pass"}} to do nothing and {{"command_name": "end"}} to indicate completion of all requirements and the end of actions. # Current Plan {plan_status} @@ -35,8 +35,8 @@ Pay close attention to the Example provided, you can reuse the example for your You may use any of the available commands to create a plan or update the plan. You may output mutiple commands, they will be executed sequentially. If you finish current task, you will automatically take the next task in the existing plan, use Plan.finish_task, DON'T append a new task. -# Your commands in a json array, in the following output format, always output ONE and ONLY ONE json array, if there is nothing to do, use the pass command: -Some text indicating your thoughts, such as how you should update the plan status, respond to inquiry, or seek for help. Then a json array of commands. +# Your commands in a json array, in the following output format. If there is nothing to do, use the pass or end command: +Some text indicating your thoughts, such as how you should update the plan status, respond to inquiry, or seek for help. Then a json array of commands. You must output ONE and ONLY ONE json array. DON'T output multiple json arrays with thoughts between them. ```json [ {{ diff --git a/metagpt/roles/di/engineer2.py b/metagpt/roles/di/engineer2.py index aaadbf191..8618e0c47 100644 --- a/metagpt/roles/di/engineer2.py +++ b/metagpt/roles/di/engineer2.py @@ -1,188 +1,40 @@ from __future__ import annotations import asyncio -import json -import traceback -from typing import Literal from pydantic import model_validator -from metagpt.actions import Action -from metagpt.actions.di.write_analysis_code import WriteAnalysisCode -from metagpt.logs import logger -from metagpt.prompts.di.engineer2 import CMD_PROMPT -from metagpt.roles.di.data_interpreter import DataInterpreter -from metagpt.schema import Message, TaskResult -from metagpt.strategy.experience_retriever import KeywordExpRetriever -from metagpt.strategy.planner import Planner +from metagpt.roles.di.role_zero import RoleZero from metagpt.tools.libs.editor import Editor -from metagpt.tools.tool_recommend import BM25ToolRecommender -from metagpt.utils.common import CodeParser from test3 import design_doc_2048, design_doc_snake, task_doc_2048, task_doc_snake -class Engineer2(DataInterpreter): +def dummy_func(**kwargs): + pass + + +class Engineer2(RoleZero): name: str = "Alex" profile: str = "Engineer" goal: str = "" - react_mode: Literal["react"] = "react" - max_react_loop: int = 20 # used for react mode - # task_result: TaskResult = None - command_rsp: str = "" # the raw string containing the commands - commands: list[dict] = [] # commands to be executed + tools: str = ["Plan", "Editor:write,read,write_content", "MGXEnv:ask_human,reply_to_human"] + editor: Editor = Editor() @model_validator(mode="after") - def set_plan_and_tool(self) -> "DataInterpreter": - # We force using this parameter for DataAnalyst - assert self.react_mode == "react" - assert self.auto_run - assert self.use_plan - - # Roughly the same part as DataInterpreter.set_plan_and_tool - self._set_react_mode(react_mode=self.react_mode, max_react_loop=self.max_react_loop, auto_run=self.auto_run) - if self.tools and not self.tool_recommender: - self.tool_recommender = BM25ToolRecommender(tools=self.tools, force=True) - self.set_actions([WriteAnalysisCode]) - self._set_state(0) - - # HACK: Init Planner, control it through dynamic thinking; Consider formalizing as a react mode - self.planner = Planner(goal="", working_memory=self.rc.working_memory, auto_run=True) - - return self - - async def _think(self) -> bool: - """Useful in 'react' mode. Use LLM to decide whether and what to do next.""" - if not self.rc.todo and not self.rc.news: - return False - - self._set_state(0) - example = "" - if not self.planner.plan.goal: - self.user_requirement = self.get_memories()[-1].content - self.planner.plan.goal = self.user_requirement - example = KeywordExpRetriever().retrieve(self.user_requirement) - else: - # self.working_memory.add_batch(self.rc.news) - self.rc.memory.add_batch(self.rc.news) - # TODO: implement experience retrieval in multi-round setting - # if self.planner.plan.current_task: - # experience = KeywordExpRetriever().retrieve(self.planner.plan.current_task.instruction, exp_type="task") - # if experience and experience not in [msg.content for msg in self.rc.memory.get()]: - # exp_msg = Message(content=experience, role="assistant") - # self.rc.memory.add(exp_msg) - # example = KeywordExpRetriever().retrieve(self.planner.plan.current_task.instruction, exp_type="task") - - plan_status = self.planner.plan.model_dump(include=["goal", "tasks"]) - for task in plan_status["tasks"]: - task.pop("code") - task.pop("result") - task.pop("is_success") - # print(plan_status) - current_task = ( - self.planner.plan.current_task.model_dump(exclude=["code", "result", "is_success"]) - if self.planner.plan.current_task - else "" - ) - - tools = await self.tool_recommender.recommend_tools() - tool_info = json.dumps({tool.name: tool.schemas for tool in tools}) - prompt = CMD_PROMPT.format( - plan_status=plan_status, - current_task=current_task, - example=example, - # available_commands=prepare_command_prompt(self.available_commands), - available_commands=tool_info, - ) - # context = self.llm.format_msg(self.working_memory.get() + [Message(content=prompt, role="user")]) - context = self.llm.format_msg(self.rc.memory.get(10) + [Message(content=prompt, role="user")]) - - print(*context, sep="\n" + "*" * 5 + "\n") - - self.command_rsp = await self.llm.aask(context) - - # self.rc.working_memory.add(Message(content=rsp, role="assistant")) - self.rc.memory.add(Message(content=self.command_rsp, role="assistant")) - - return True - - async def _act(self) -> Message: - try: - commands = json.loads(CodeParser.parse_code(block=None, lang="json", text=self.command_rsp)) - except Exception as e: - tb = traceback.format_exc() - print(tb) - error_msg = Message(content=str(e), role="user") - self.rc.memory.add(error_msg) - return error_msg - outputs = await self.run_commands(commands) - # self.rc.working_memory.add(Message(content=outputs, role="user")) - self.rc.memory.add(Message(content=outputs, role="user")) - return Message(content="Task completed", role="assistant", sent_from=self._setting, cause_by=WriteAnalysisCode) - - async def _react(self) -> Message: - actions_taken = 0 - rsp = Message(content="No actions taken yet", cause_by=Action) # will be overwritten after Role _act - while actions_taken < self.rc.max_react_loop: - # NOTE: difference here, keep observing within react - await self._observe() - # think - has_todo = await self._think() - if not has_todo: - break - # act - logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}") - rsp = await self._act() - actions_taken += 1 - return rsp # return output from the last action - - async def run_commands(self, commands) -> list: - tool_execute_map = { + def set_tool_execution_map(self) -> "RoleZero": + self.tool_execute_map = { "Plan.append_task": self.planner.plan.append_task, "Plan.reset_task": self.planner.plan.reset_task, "Plan.replace_task": self.planner.plan.replace_task, "Editor.write": self.editor.write, "Editor.write_content": self.editor.write_content, "Editor.read": self.editor.read, + "MGXEnv.ask_human": dummy_func, + "MGXEnv.reply_to_human": dummy_func, } + return self - # print(*commands, sep="\n") - - is_success = True - outputs = ["Commands executed."] - for cmd in commands: - if cmd["command_name"] in tool_execute_map: - try: - output = tool_execute_map[cmd["command_name"]](**cmd["args"]) - if output: - outputs.append(f"Output for {cmd['command_name']}: {str(output)}") - except Exception as e: - tb = traceback.format_exc() - print(e, tb) - outputs.append(tb) - is_success = False - break # Stop executing if any command fails - outputs = "\n\n".join(outputs) - - # Handle finish_current_task and end individually as a last step - for cmd in commands: - if ( - is_success - and cmd["command_name"] == "Plan.finish_current_task" - and not self.planner.plan.is_plan_finished() - ): - task_result = TaskResult(code=str(commands), result=outputs, is_success=is_success) - self.planner.plan.current_task.update_task_result(task_result=task_result) - self.planner.plan.finish_current_task() - # self.rc.working_memory.clear() - - elif cmd["command_name"] == "Common.end": - self._set_state(-1) - - return outputs - - -WINE_REQ = "Run data analysis on sklearn Wine recognition dataset, and train a model to predict wine class (20% as validation), and show validation accuracy." GAME_REQ_2048 = f""" Create a 2048 game, follow the design doc and task doc. Write your code under /Users/gary/Files/temp/workspace/2048_game/src. @@ -202,17 +54,23 @@ Design doc: Task doc: {design_doc_snake} """ +GAME_REQ_2048_NO_DOC = """ +Create a 2048 game with pygame. Write your code under /Users/gary/Files/temp/workspace/2048_game/src. +Consider what files you will write, break down the requests to multiple tasks and write one file in each task. +After writing all codes, write a code review for the codes, make improvement or adjustment based on the review. +Notice: You MUST implement the full code, don't leave comment without implementation! +""" GAME_INC_REQ_2048 = """ I found an issue with the 2048 code: when tiles are merged, no new tiles pop up. Write code review for the codes (game.py, main.py, ui.py) under under /Users/gary/Files/temp/workspace/2048_game_bugs/src. Then correct any issues you find. You can review all code in one time, and solve issues in one time. """ GAME_INC_REQ_SNAKE = """ -Based on the design doc at /Users/gary/Files/temp/workspace/snake_game_bugs/docs/20240513200737.json, +Found this issue, TypeError: generate_new_position() missing 1 required positional argument: 'snake_body' Write code review for the codes (food.py, game.py, main.py, snake.py, ui.py) under under /Users/gary/Files/temp/workspace/snake_game_bugs/src. -Then correct any issues you find. You can read the design doc first, then review all code in one time, and solve issues in one time. +Then correct any issues you find. You can review all code in one time, and solve issues in one time. """ if __name__ == "__main__": - engineer2 = Engineer2(tools=["Plan", "Editor:write,read,write_content", "MGXEnv:ask_human,reply_to_human"]) - asyncio.run(engineer2.run(GAME_INC_REQ_2048)) + engineer2 = Engineer2() + asyncio.run(engineer2.run(GAME_REQ_2048_NO_DOC)) diff --git a/metagpt/roles/di/role_zero.py b/metagpt/roles/di/role_zero.py new file mode 100644 index 000000000..8d6455d90 --- /dev/null +++ b/metagpt/roles/di/role_zero.py @@ -0,0 +1,184 @@ +from __future__ import annotations + +import inspect +import json +import traceback +from typing import Literal + +from pydantic import model_validator + +from metagpt.actions import Action +from metagpt.actions.di.run_command import RunCommand +from metagpt.logs import logger +from metagpt.prompts.di.role_zero import CMD_PROMPT +from metagpt.roles import Role +from metagpt.schema import Message +from metagpt.strategy.experience_retriever import KeywordExpRetriever +from metagpt.strategy.planner import Planner +from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender +from metagpt.utils.common import CodeParser + + +class RoleZero(Role): + name: str = "Zero" + profile: str = "RoleZero" + goal: str = "" + system_msg: str = "" + cmd_prompt: str = CMD_PROMPT + + react_mode: Literal["react"] = "react" + max_react_loop: int = 20 # used for react mode + + user_requirement: str = "" + command_rsp: str = "" # the raw string containing the commands + commands: list[dict] = [] # commands to be executed + memory_k: int = 20 # number of memories (messages) to use as historical context + + tools: list[str] = [] # Use special symbol [""] to indicate use of all registered tools + tool_recommender: ToolRecommender = None + tool_execution_map: dict[str, callable] = {} + + @model_validator(mode="after") + def set_plan_and_tool(self) -> "RoleZero": + # We force using this parameter for DataAnalyst + assert self.react_mode == "react" + + # Roughly the same part as DataInterpreter.set_plan_and_tool + self._set_react_mode(react_mode=self.react_mode, max_react_loop=self.max_react_loop) + if self.tools and not self.tool_recommender: + self.tool_recommender = BM25ToolRecommender(tools=self.tools, force=True) + self.set_actions([RunCommand]) + self._set_state(0) + + # HACK: Init Planner, control it through dynamic thinking; Consider formalizing as a react mode + self.planner = Planner(goal="", working_memory=self.rc.working_memory, auto_run=True) + + return self + + @model_validator(mode="after") + def set_tool_execution_map(self) -> "RoleZero": + raise NotImplementedError + + async def _think(self) -> bool: + """Useful in 'react' mode. Use LLM to decide whether and what to do next.""" + if not self.rc.todo and not self.rc.news: + return False + + self._set_state(0) + example = "" + if not self.planner.plan.goal: + self.user_requirement = self.get_memories()[-1].content + self.planner.plan.goal = self.user_requirement + example = KeywordExpRetriever().retrieve(self.user_requirement) + else: + self.rc.memory.add_batch(self.rc.news) + # TODO: implement experience retrieval in multi-round setting + # if self.planner.plan.current_task: + # experience = KeywordExpRetriever().retrieve(self.planner.plan.current_task.instruction, exp_type="task") + # if experience and experience not in [msg.content for msg in self.rc.memory.get()]: + # exp_msg = Message(content=experience, role="assistant") + # self.rc.memory.add(exp_msg) + # example = KeywordExpRetriever().retrieve(self.planner.plan.current_task.instruction, exp_type="task") + + plan_status = self.planner.plan.model_dump(include=["goal", "tasks"]) + for task in plan_status["tasks"]: + task.pop("code") + task.pop("result") + task.pop("is_success") + # print(plan_status) + current_task = ( + self.planner.plan.current_task.model_dump(exclude=["code", "result", "is_success"]) + if self.planner.plan.current_task + else "" + ) + + tools = await self.tool_recommender.recommend_tools() + tool_info = json.dumps({tool.name: tool.schemas for tool in tools}) + prompt = self.cmd_prompt.format( + plan_status=plan_status, + current_task=current_task, + example=example, + available_commands=tool_info, + ) + context = self.llm.format_msg(self.rc.memory.get(self.memory_k) + [Message(content=prompt, role="user")]) + + print(*context, sep="\n" + "*" * 5 + "\n") + + self.command_rsp = await self.llm.aask(context) + + self.rc.memory.add(Message(content=self.command_rsp, role="assistant")) + + return True + + async def _act(self) -> Message: + try: + commands = json.loads(CodeParser.parse_code(block=None, lang="json", text=self.command_rsp)) + except Exception as e: + tb = traceback.format_exc() + print(tb) + error_msg = Message(content=str(e), role="user") + self.rc.memory.add(error_msg) + return error_msg + outputs = await self._run_commands(commands) + self.rc.memory.add(Message(content=outputs, role="user")) + return Message(content="Task completed", role="assistant", sent_from=self._setting, cause_by=RunCommand) + + async def _react(self) -> Message: + actions_taken = 0 + rsp = Message(content="No actions taken yet", cause_by=Action) # will be overwritten after Role _act + while actions_taken < self.rc.max_react_loop: + # NOTE: difference here, keep observing within react + await self._observe() + # think + has_todo = await self._think() + if not has_todo: + break + # act + logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}") + rsp = await self._act() + actions_taken += 1 + return rsp # return output from the last action + + async def _run_commands(self, commands) -> list: + outputs = [] + for cmd in commands: + # handle special command first + if await self._run_special_command(cmd): + continue + # run command as specified by tool_execute_map + if cmd["command_name"] in self.tool_execute_map: + tool_obj = self.tool_execute_map[cmd["command_name"]] + output = f"Command {cmd['command_name']} executed" + try: + if inspect.iscoroutinefunction(tool_obj): + tool_output = await tool_obj(**cmd["args"]) + else: + tool_output = tool_obj(**cmd["args"]) + if tool_output: + output += f": {str(tool_output)}" + outputs.append(output) + except Exception as e: + tb = traceback.format_exc() + print(e, tb) + outputs.append(output + f": {tb}") + break # Stop executing if any command fails + else: + outputs.append(f"Command {cmd['command_name']} not found.") + break + outputs = "\n\n".join(outputs) + + return outputs + + async def _run_special_command(self, cmd) -> bool: + """command requiring special check or parsing""" + is_special_cmd = cmd["command_name"] in ["Plan.finish_current_task", "Common.end"] + + if cmd["command_name"] == "Plan.finish_current_task" and not self.planner.plan.is_plan_finished(): + # task_result = TaskResult(code=str(commands), result=outputs, is_success=is_success) + # self.planner.plan.current_task.update_task_result(task_result=task_result) + self.planner.plan.finish_current_task() + + elif cmd["command_name"] == "end": + self._set_state(-1) + + return is_special_cmd