From ddaecf12eb86574d93f49f28c5f4be89ecaee55d Mon Sep 17 00:00:00 2001 From: lidanyang Date: Thu, 27 Jun 2024 11:23:00 +0800 Subject: [PATCH] apply data_analyst to role_zero --- metagpt/prompts/di/role_zero.py | 2 +- metagpt/prompts/di/write_analysis_code.py | 5 +- metagpt/roles/di/data_analyst.py | 178 ++++++++-------------- metagpt/roles/di/role_zero.py | 11 +- metagpt/tools/tool_recommend.py | 4 +- 5 files changed, 83 insertions(+), 117 deletions(-) diff --git a/metagpt/prompts/di/role_zero.py b/metagpt/prompts/di/role_zero.py index 4d52476aa..04344fa1e 100644 --- a/metagpt/prompts/di/role_zero.py +++ b/metagpt/prompts/di/role_zero.py @@ -5,7 +5,7 @@ When presented a current task, tackle the task using the available commands. Pay close attention to new user message, review the conversation history, use RoleZero.reply_to_human to respond to new user requirement. Note: 1. If you keeping encountering errors, unexpected situation, or you are not sure of proceeding, use RoleZero.ask_human to ask for help. -2. Carefully review your progress at the current task, if your actions so far has not fulfilled the task instruction, you should continue with current task. Otherwise, finish current task. +2. Carefully review your progress at the current task, if your actions so far has not fulfilled the task instruction, you should continue with current task. Otherwise, finish current task by Plan.finish_current_task. 3. Each time you finish a task, use RoleZero.reply_to_human to report your progress. """ diff --git a/metagpt/prompts/di/write_analysis_code.py b/metagpt/prompts/di/write_analysis_code.py index af941808d..1d743a719 100644 --- a/metagpt/prompts/di/write_analysis_code.py +++ b/metagpt/prompts/di/write_analysis_code.py @@ -28,7 +28,10 @@ your code ``` """ -REFLECTION_SYSTEM_MSG = """You are an AI Python assistant. You will be given your previous implementation code of a task, runtime error results, and a hint to change the implementation appropriately. Write your full implementation.""" +REFLECTION_SYSTEM_MSG = """ +You are an AI Python assistant. You will be given your previous implementation code of a task, runtime error results, and a hint to change the implementation appropriately. Write your full implementation. +When occuring ModuleNotFoundError, always install the required package. And use Terminal tool if available. +""" DEBUG_REFLECTION_EXAMPLE = ''' [previous impl]: diff --git a/metagpt/roles/di/data_analyst.py b/metagpt/roles/di/data_analyst.py index d4d67742b..f13fc78fb 100644 --- a/metagpt/roles/di/data_analyst.py +++ b/metagpt/roles/di/data_analyst.py @@ -1,134 +1,88 @@ from __future__ import annotations -import json -from typing import Literal +from pydantic import Field -from pydantic import model_validator - -from metagpt.actions import Action +from metagpt.actions.di.execute_nb_code import ExecuteNbCode from metagpt.actions.di.write_analysis_code import WriteAnalysisCode from metagpt.logs import logger -from metagpt.prompts.di.data_analyst import CMD_PROMPT -from metagpt.roles.di.data_interpreter import DataInterpreter -from metagpt.schema import Message, TaskResult -from metagpt.strategy.experience_retriever import KeywordExpRetriever -from metagpt.strategy.planner import Planner -from metagpt.strategy.thinking_command import ( - Command, - prepare_command_prompt, - run_commands, -) -from metagpt.tools.tool_recommend import BM25ToolRecommender -from metagpt.utils.common import CodeParser -from metagpt.utils.report import ThoughtReporter +from metagpt.roles.di.role_zero import RoleZero +from metagpt.schema import TaskResult, Message +from metagpt.tools.tool_registry import register_tool -class DataAnalyst(DataInterpreter): +@register_tool(include_functions=["write_and_exec_code"]) +class DataAnalyst(RoleZero): name: str = "David" profile: str = "DataAnalyst" goal: str = "Take on any data-related tasks, such as data analysis, machine learning, deep learning, web browsing, web scraping, web searching, web deployment, terminal operation, git and github operation, etc." - react_mode: Literal["react"] = "react" - max_react_loop: int = 20 # used for react mode + + tools: list[str] = ["Plan", "DataAnalyst", "RoleZero"] + custom_tools: list[str] = ["machine learning", "web scraping", "Terminal"] + + use_reflection: bool = True + write_code: WriteAnalysisCode = Field(default_factory=WriteAnalysisCode, exclude=True) + execute_code: ExecuteNbCode = Field(default_factory=ExecuteNbCode, exclude=True) task_result: TaskResult = None - available_commands: list[Command] = [ - Command.APPEND_TASK, - Command.RESET_TASK, - Command.REPLACE_TASK, - Command.FINISH_CURRENT_TASK, - # Command.PUBLISH_MESSAGE, - Command.ASK_HUMAN, - Command.REPLY_TO_HUMAN, - # Command.PASS, - ] - commands: list[dict] = [] # issued commands to be executed - user_requirement: str = "" - @model_validator(mode="after") - def set_plan_and_tool(self) -> "DataInterpreter": - # We force using this parameter for DataAnalyst - assert self.react_mode == "react" - assert self.auto_run - assert self.use_plan + def _update_tool_execution(self): + self.tool_execution_map = { + "Plan.append_task": self.planner.plan.append_task, + "Plan.reset_task": self.planner.plan.reset_task, + "Plan.replace_task": self.planner.plan.replace_task, + "DataAnalyst.write_and_exec_code": self.write_and_exec_code, + "RoleZero.ask_human": self.ask_human, + "RoleZero.reply_to_human": self.reply_to_human, + } - # Roughly the same part as DataInterpreter.set_plan_and_tool - self._set_react_mode(react_mode=self.react_mode, max_react_loop=self.max_react_loop, auto_run=self.auto_run) - if self.tools and not self.tool_recommender: - self.tool_recommender = BM25ToolRecommender(tools=self.tools) - self.set_actions([WriteAnalysisCode]) + async def write_and_exec_code(self): + """Write a code block for current task and execute it in an interactive notebook environment.""" + counter = 0 + success = False - # HACK: Init Planner, control it through dynamic thinking; Consider formalizing as a react mode - self.planner = Planner(goal="", working_memory=self.rc.working_memory, auto_run=True) + # plan info + plan_status = self.planner.get_plan_status() - return self + # tool info + if self.custom_tool_recommender: + plan = self.planner.plan + fix = ["Terminal"] if "Terminal" in self.custom_tools else None + tool_info = await self.custom_tool_recommender.get_recommended_tool_info(fix=fix, plan=plan) + else: + tool_info = "" - async def _think(self) -> bool: - """Useful in 'react' mode. Use LLM to decide whether and what to do next.""" - self._set_state(0) - example = "" - if not self.planner.plan.goal: - self.user_requirement = self.get_memories()[-1].content - self.planner.plan.goal = self.user_requirement - example = KeywordExpRetriever().retrieve(self.user_requirement) + while not success and counter < 3: + ### write code ### + logger.info(f"ready to WriteAnalysisCode") + use_reflection = (counter > 0 and self.use_reflection) # only use reflection after the first trial - plan_status = self.planner.plan.model_dump(include=["goal", "tasks"]) - # for task in plan_status["tasks"]: - # task.pop("code") - # task.pop("result") - prompt = CMD_PROMPT.format( - plan_status=plan_status, - example=example, - available_commands=prepare_command_prompt(self.available_commands), - ) - context = self.llm.format_msg(self.working_memory.get() + [Message(content=prompt, role="user")]) - # print(*context, sep="\n" + "*" * 5 + "\n") - async with ThoughtReporter(enable_llm_stream=True): - rsp = await self.llm.aask(context) - self.commands = json.loads(CodeParser.parse_code(block=None, lang='json', text=rsp)) - self.rc.working_memory.add(Message(content=rsp, role="assistant")) + code = await self.write_code.run( + user_requirement=self.planner.plan.goal, + plan_status=plan_status, + tool_info=tool_info, + working_memory=self.rc.working_memory.get() if use_reflection else None, + use_reflection=use_reflection, + ) + self.rc.working_memory.add(Message(content=code, role="assistant", cause_by=WriteAnalysisCode)) - await run_commands(self, self.commands, self.rc.working_memory) + ### execute code ### + result, success = await self.execute_code.run(code) + print(result) - return bool(self.rc.todo) + self.rc.working_memory.add(Message(content=result, role="user", cause_by=ExecuteNbCode)) - async def _act(self) -> Message: - """Useful in 'react' mode. Return a Message conforming to Role._act interface.""" - logger.info(f"ready to take on task {self.planner.plan.current_task}") + ### process execution result ### + counter += 1 + self.task_result = TaskResult(code=code, result=result, is_success=success) - # TODO: Consider an appropriate location to insert task experience formally - experience = KeywordExpRetriever().retrieve(self.planner.plan.current_task.instruction, exp_type="task") - if experience and experience not in [msg.content for msg in self.rc.working_memory.get()]: - exp_msg = Message(content=experience, role="assistant") - self.rc.working_memory.add(exp_msg) + output = f""" + Code written: + {code} + Execution status:{'Success' if success else 'Failed'} + Execution result: {result} + """ + self.rc.working_memory.clear() + return output - code, result, is_success = await self._write_and_exec_code() - self.planner.plan.current_task.is_success = ( - is_success # mark is_success, determine is_finished later in thinking - ) - - # FIXME: task result is always overwritten by the last act, whereas it can be made of of multiple acts - self.task_result = TaskResult(code=code, result=result, is_success=is_success) - return Message(content="Task completed", role="assistant", sent_from=self._setting, cause_by=WriteAnalysisCode) - - async def _react(self) -> Message: - # NOTE: Diff 1: Each time landing here means observing news, set todo to allow news processing in _think - self._set_state(0) - - actions_taken = 0 - rsp = Message(content="No actions taken yet", cause_by=Action) # will be overwritten after Role _act - while actions_taken < self.rc.max_react_loop: - # NOTE: Diff 2: Keep observing within _react, news will go into memory, allowing adapting to new info - # add news from self._observe, the one called in self.run, consider removing when switching from working_memory to memory - self.working_memory.add_batch(self.rc.news) - await self._observe() - # add news from this self._observe, we need twice because _observe rewrites rc.news - self.working_memory.add_batch(self.rc.news) - - # think - has_todo = await self._think() - if not has_todo: - break - # act - logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}") - rsp = await self._act() - actions_taken += 1 - return rsp # return output from the last action + def _finish_current_task(self): + self.planner.current_task.update_task_result(self.task_result) + super()._finish_current_task() diff --git a/metagpt/roles/di/role_zero.py b/metagpt/roles/di/role_zero.py index 39338471a..4225d1f76 100644 --- a/metagpt/roles/di/role_zero.py +++ b/metagpt/roles/di/role_zero.py @@ -41,8 +41,10 @@ class RoleZero(Role): max_react_loop: int = 20 # used for react mode # Tools - tools: list[str] = [] # Use special symbol [""] to indicate use of all registered tools + tools: list[str] = [] tool_recommender: ToolRecommender = None + custom_tools: list[str] = [] + custom_tool_recommender: ToolRecommender = None tool_execution_map: dict[str, Callable] = {} special_tool_commands: list[str] = ["Plan.finish_current_task", "end"] # Equipped with three basic tools by default for optional use @@ -68,6 +70,8 @@ class RoleZero(Role): self._set_react_mode(react_mode=self.react_mode, max_react_loop=self.max_react_loop) if self.tools and not self.tool_recommender: self.tool_recommender = BM25ToolRecommender(tools=self.tools, force=True) + if self.custom_tools and not self.custom_tool_recommender: + self.custom_tool_recommender = BM25ToolRecommender(tools=self.custom_tools) self.set_actions([RunCommand]) # HACK: Init Planner, control it through dynamic thinking; Consider formalizing as a react mode @@ -235,13 +239,16 @@ class RoleZero(Role): if cmd["command_name"] == "Plan.finish_current_task" and not self.planner.plan.is_plan_finished(): # task_result = TaskResult(code=str(commands), result=outputs, is_success=is_success) # self.planner.plan.current_task.update_task_result(task_result=task_result) - self.planner.plan.finish_current_task() + self._finish_current_task() elif cmd["command_name"] == "end": self._set_state(-1) return is_special_cmd + def _finish_current_task(self): + self.planner.plan.finish_current_task() + def _get_plan_status(self) -> Tuple[str, str]: plan_status = self.planner.plan.model_dump(include=["goal", "tasks"]) for task in plan_status["tasks"]: diff --git a/metagpt/tools/tool_recommend.py b/metagpt/tools/tool_recommend.py index 05e8e1400..ab847d10e 100644 --- a/metagpt/tools/tool_recommend.py +++ b/metagpt/tools/tool_recommend.py @@ -101,7 +101,7 @@ class ToolRecommender(BaseModel): return ranked_tools - async def get_recommended_tool_info(self, **kwargs) -> str: + async def get_recommended_tool_info(self, fix: list[str] = None, **kwargs) -> str: """ Wrap recommended tools with their info in a string, which can be used directly in a prompt. """ @@ -109,6 +109,8 @@ class ToolRecommender(BaseModel): if not recommended_tools: return "" tool_schemas = {tool.name: tool.schemas for tool in recommended_tools} + if fix: + tool_schemas.update({tool.name: tool.schemas for tool in self.tools.values() if tool.name in fix}) return TOOL_INFO_PROMPT.format(tool_schemas=tool_schemas) async def recall_tools(self, context: str = "", plan: Plan = None, topk: int = 20) -> list[Tool]: