diff --git a/metagpt/prompts/di/data_analyst.py b/metagpt/prompts/di/data_analyst.py new file mode 100644 index 000000000..1f69262af --- /dev/null +++ b/metagpt/prompts/di/data_analyst.py @@ -0,0 +1,40 @@ +CMD_PROMPT = """ +# Data Structure +class Task(BaseModel): + task_id: str = "" + dependent_task_ids: list[str] = [] + instruction: str = "" + task_type: str = "" + assignee: str = "David" + +# Available Commands +{available_commands} + +# Current Plan +{plan_status} + +# Example +{example} + +# Instructions +Based on the context, write a plan or modify an existing plan to achieve the goal. A plan consists of one to 3 tasks. +If plan is created, you should track the progress and update the plan accordingly, such as finish_current_task, append_task, reset_task, replace_task, etc. +Pay close attention to new user message, review the conversation history, use reply_to_human to respond to new user requirement. +Note: +1. If you keeping encountering errors, unexpected situation, or you are not sure of proceeding, use ask_human to ask for help. + +You may use any of the available commands to create a plan or update the plan. You may output mutiple commands, they will be executed sequentially. +If you finish current task, you will automatically take the next task in the existing plan, use finish_task, DON'T append a new task. + +# Your commands in a json array, in the following output format, always output a json array, if there is nothing to do, use the pass command: +Some text indicating your thoughts, such as how you should update the plan status, respond to inquiry, or seek for help. Then a json array of commands. +```json +[ + {{ + "command_name": str, + "args": {{"arg_name": arg_value, ...}} + }}, + ... +] +``` +""" diff --git a/metagpt/prompts/di/write_analysis_code.py b/metagpt/prompts/di/write_analysis_code.py index d2b4f1299..af941808d 100644 --- a/metagpt/prompts/di/write_analysis_code.py +++ b/metagpt/prompts/di/write_analysis_code.py @@ -1,7 +1,9 @@ INTERPRETER_SYSTEM_MSG = """ As a data scientist, you need to help user to achieve their goal step by step in a continuous Jupyter notebook. Since it is a notebook environment, don't use asyncio.run. Instead, use await if you need to call an async function. -If you want to use shell command such as git clone, pip install packages, navigate folders, read file, etc., use Terminal tool if available before trying ! in notebook block. +If you want to use shell command such as git clone, pip install packages, navigate folders, read file, etc., use Terminal tool if available. DON'T use ! in notebook block. +Don't write all codes in one response, each time, just write code for one step or current task. +While some concise thoughts are helpful, code is absolutely required. Always output one and only one code block in your response. """ STRUCTUAL_PROMPT = """ diff --git a/metagpt/roles/di/data_analyst.py b/metagpt/roles/di/data_analyst.py new file mode 100644 index 000000000..84f72664b --- /dev/null +++ b/metagpt/roles/di/data_analyst.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import json +from typing import Literal + +from pydantic import model_validator + +from metagpt.actions import Action +from metagpt.actions.di.write_analysis_code import WriteAnalysisCode +from metagpt.logs import logger +from metagpt.prompts.di.data_analyst import CMD_PROMPT +from metagpt.roles.di.data_interpreter import DataInterpreter +from metagpt.schema import Message, TaskResult +from metagpt.strategy.planner import Planner +from metagpt.strategy.thinking_command import ( + Command, + prepare_command_prompt, + run_commands, +) +from metagpt.tools.tool_recommend import BM25ToolRecommender +from metagpt.utils.common import CodeParser + + +class DataAnalyst(DataInterpreter): + name: str = "David" + profile: str = "DataAnalyst" + react_mode: Literal["react"] = "react" + max_react_loop: int = 20 # used for react mode + task_result: TaskResult = None + available_commands: list[Command] = [ + Command.APPEND_TASK, + Command.RESET_TASK, + Command.REPLACE_TASK, + Command.FINISH_CURRENT_TASK, + # Command.PUBLISH_MESSAGE, + Command.ASK_HUMAN, + Command.REPLY_TO_HUMAN, + # Command.PASS, + ] + commands: list[dict] = [] # issued commands to be executed + + @model_validator(mode="after") + def set_plan_and_tool(self) -> "DataInterpreter": + # We force using this parameter for DataAnalyst + assert self.react_mode == "react" + assert self.auto_run + assert self.use_plan + + # Roughly the same part as DataInterpreter.set_plan_and_tool + self._set_react_mode(react_mode=self.react_mode, max_react_loop=self.max_react_loop, auto_run=self.auto_run) + if self.tools and not self.tool_recommender: + self.tool_recommender = BM25ToolRecommender(tools=self.tools) + self.set_actions([WriteAnalysisCode]) + self._set_state(0) + + # HACK: Init Planner, control it through dynamic thinking; Consider formalizing as a react mode + self.planner = Planner(goal=self.goal, working_memory=self.rc.working_memory, auto_run=True) + + return self + + async def _think(self) -> bool: + """Useful in 'react' mode. Use LLM to decide whether and what to do next.""" + self._set_state(0) + if not self.planner.plan.goal: + self.user_requirement = self.get_memories()[-1].content + self.planner.plan.goal = self.user_requirement + else: + self.working_memory.add_batch(self.rc.news) + + plan_status = self.planner.plan.model_dump(include=["goal", "tasks"]) + for task in plan_status["tasks"]: + task.pop("code") + task.pop("result") + example = "" + prompt = CMD_PROMPT.format( + plan_status=plan_status, + example=example, + available_commands=prepare_command_prompt(self.available_commands), + ) + context = self.llm.format_msg(self.working_memory.get() + [Message(content=prompt, role="user")]) + + rsp = await self.llm.aask(context) + self.commands = json.loads(CodeParser.parse_code(block=None, text=rsp)) + self.rc.memory.add(Message(content=rsp, role="assistant")) + + await run_commands(self, self.commands) + + return bool(self.rc.todo) + + async def _act(self) -> Message: + """Useful in 'react' mode. Return a Message conforming to Role._act interface.""" + logger.info(f"ready to take on task {self.planner.plan.current_task}") + code, result, is_success = await self._write_and_exec_code() + self.planner.plan.current_task.is_success = ( + is_success # mark is_success, determine is_finished later in thinking + ) + self.task_result = TaskResult(code=code, result=result, is_success=is_success) + return Message(content="Task completed", role="assistant", sent_from=self._setting, cause_by=WriteAnalysisCode) + + async def _react(self) -> Message: + actions_taken = 0 + rsp = Message(content="No actions taken yet", cause_by=Action) # will be overwritten after Role _act + while actions_taken < self.rc.max_react_loop: + # NOTE: difference here, keep observing within react + await self._observe() + # think + has_todo = await self._think() + if not has_todo: + break + # act + logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}") + rsp = await self._act() + actions_taken += 1 + return rsp # return output from the last action diff --git a/metagpt/roles/di/data_interpreter.py b/metagpt/roles/di/data_interpreter.py index 2e1e0a2da..f574943cc 100644 --- a/metagpt/roles/di/data_interpreter.py +++ b/metagpt/roles/di/data_interpreter.py @@ -5,7 +5,7 @@ from typing import Literal from pydantic import Field, model_validator -from metagpt.actions.di.ask_review import ReviewConst +# from metagpt.actions.di.ask_review import ReviewConst from metagpt.actions.di.execute_nb_code import ExecuteNbCode from metagpt.actions.di.write_analysis_code import CheckData, WriteAnalysisCode from metagpt.logs import logger @@ -43,6 +43,7 @@ class DataInterpreter(Role): tool_recommender: ToolRecommender = None react_mode: Literal["plan_and_act", "react"] = "plan_and_act" max_react_loop: int = 10 # used for react mode + user_requirement: str = "" @model_validator(mode="after") def set_plan_and_tool(self) -> "Interpreter": @@ -62,7 +63,7 @@ class DataInterpreter(Role): async def _think(self) -> bool: """Useful in 'react' mode. Use LLM to decide whether and what to do next.""" - user_requirement = self.get_memories()[-1].content + self.user_requirement = self.get_memories()[-1].content context = self.working_memory.get() if not context: @@ -71,7 +72,7 @@ class DataInterpreter(Role): self._set_state(0) return True - prompt = REACT_THINK_PROMPT.format(user_requirement=user_requirement, context=context) + prompt = REACT_THINK_PROMPT.format(user_requirement=self.user_requirement, context=context) rsp = await self.llm.aask(prompt) rsp_dict = json.loads(CodeParser.parse_code(block=None, text=rsp)) self.working_memory.add(Message(content=rsp_dict["thoughts"], role="assistant")) @@ -83,7 +84,7 @@ class DataInterpreter(Role): async def _act(self) -> Message: """Useful in 'react' mode. Return a Message conforming to Role._act interface.""" code, _, _ = await self._write_and_exec_code() - return Message(content=code, role="assistant", cause_by=WriteAnalysisCode) + return Message(content=code, role="assistant", sent_from=self._setting, cause_by=WriteAnalysisCode) async def _plan_and_act(self) -> Message: self._set_state(0) @@ -136,11 +137,11 @@ class DataInterpreter(Role): ### process execution result ### counter += 1 - if not success and counter >= max_retry: - logger.info("coding failed!") - review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER) - if ReviewConst.CHANGE_WORDS[0] in review: - counter = 0 # redo the task again with help of human suggestions + # if not success and counter >= max_retry: + # logger.info("coding failed!") + # review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER) + # if ReviewConst.CHANGE_WORDS[0] in review: + # counter = 0 # redo the task again with help of human suggestions return code, result, success @@ -154,10 +155,8 @@ class DataInterpreter(Role): logger.info(f"ready to {todo.name}") use_reflection = counter > 0 and self.use_reflection # only use reflection after the first trial - user_requirement = self.get_memories()[-1].content - code = await todo.run( - user_requirement=user_requirement, + user_requirement=self.user_requirement, plan_status=plan_status, tool_info=tool_info, working_memory=self.working_memory.get(), diff --git a/metagpt/roles/di/team_leader.py b/metagpt/roles/di/team_leader.py index 0224b11dc..bf1619308 100644 --- a/metagpt/roles/di/team_leader.py +++ b/metagpt/roles/di/team_leader.py @@ -5,17 +5,20 @@ import json from pydantic import model_validator from metagpt.actions.di.run_command import RunCommand -from metagpt.environment.mgx.mgx_env import MGXEnv from metagpt.prompts.di.team_leader import ( CMD_PROMPT, FINISH_CURRENT_TASK_CMD, SYSTEM_PROMPT, ) from metagpt.roles import Role -from metagpt.schema import Message, Task, TaskResult +from metagpt.schema import Message, TaskResult from metagpt.strategy.experience_retriever import SimpleExpRetriever from metagpt.strategy.planner import Planner -from metagpt.strategy.thinking_command import Command, prepare_command_prompt +from metagpt.strategy.thinking_command import ( + Command, + prepare_command_prompt, + run_commands, +) from metagpt.utils.common import CodeParser @@ -37,39 +40,12 @@ class TeamLeader(Role): @model_validator(mode="after") def set_plan(self) -> "TeamLeader": + self.rc.working_memory = ( + self.rc.memory + ) # TeamLeader does not need working memory, all messages should go into memory self.planner = Planner(goal=self.goal, working_memory=self.rc.working_memory, auto_run=True) return self - async def _run_env_command(self, cmd): - assert isinstance(self.rc.env, MGXEnv), "TeamLeader should only be used in an MGXEnv" - if cmd["command_name"] == Command.PUBLISH_MESSAGE.cmd_name: - self.publish_message(Message(**cmd["args"])) - elif cmd["command_name"] == Command.ASK_HUMAN.cmd_name: - await self.rc.env.ask_human(sent_from=self, **cmd["args"]) - elif cmd["command_name"] == Command.REPLY_TO_HUMAN.cmd_name: - await self.rc.env.reply_to_human(sent_from=self, **cmd["args"]) - - def _run_internal_command(self, cmd): - if cmd["command_name"] == Command.APPEND_TASK.cmd_name: - self.planner.plan.append_task(Task(**cmd["args"])) - elif cmd["command_name"] == Command.RESET_TASK.cmd_name: - self.planner.plan.reset_task(**cmd["args"]) - elif cmd["command_name"] == Command.REPLACE_TASK.cmd_name: - self.planner.plan.replace_task(Task(**cmd["args"])) - elif cmd["command_name"] == Command.FINISH_CURRENT_TASK.cmd_name: - self.planner.plan.current_task.update_task_result(task_result=self.task_result) - self.planner.plan.finish_current_task() - self.rc.working_memory.clear() - - async def run_commands(self, cmds): - print(*cmds, sep="\n") - for cmd in cmds: - await self._run_env_command(cmd) - self._run_internal_command(cmd) - - if self.planner.plan.is_plan_finished(): - self._set_state(-1) - async def _think(self) -> bool: """Useful in 'react' mode. Use LLM to decide whether and what to do next.""" @@ -104,7 +80,7 @@ class TeamLeader(Role): async def _act(self) -> Message: """Useful in 'react' mode. Return a Message conforming to Role._act interface.""" - await self.run_commands(self.commands) + await run_commands(self, self.commands) self.task_result = TaskResult(result="Success", is_success=True) msg = Message(content="Commands executed", send_to="no one") # a dummy message to conform to the interface self.rc.memory.add(msg) diff --git a/metagpt/roles/role.py b/metagpt/roles/role.py index 9b15ab9a5..02e02b0eb 100644 --- a/metagpt/roles/role.py +++ b/metagpt/roles/role.py @@ -416,11 +416,11 @@ class Role(SerializationMixin, ContextMixin, BaseModel): news = self.rc.msg_buffer.pop_all() # Store the read messages in your own memory to prevent duplicate processing. old_messages = [] if ignore_memory else self.rc.memory.get() - self.rc.memory.add_batch(news) - # Filter out messages of interest. + # Filter in messages of interest. self.rc.news = [ n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages ] + self.rc.memory.add_batch(self.rc.news) # only save messages of interest into memory self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None # record the latest observed msg # Design Rules: diff --git a/metagpt/strategy/thinking_command.py b/metagpt/strategy/thinking_command.py index 53b206da8..78c975e6c 100644 --- a/metagpt/strategy/thinking_command.py +++ b/metagpt/strategy/thinking_command.py @@ -2,6 +2,10 @@ from enum import Enum from pydantic import BaseModel +from metagpt.environment.mgx.mgx_env import MGXEnv +from metagpt.roles import Role +from metagpt.schema import Message, Task + class CommandDef(BaseModel): name: str @@ -66,3 +70,39 @@ def prepare_command_prompt(commands: list[Command]) -> str: for i, command in enumerate(commands): command_prompt += f"{i+1}. {command.value.signature}:\n{command.value.desc}\n\n" return command_prompt + + +async def run_env_command(role: Role, cmd): + assert isinstance(role.rc.env, MGXEnv), "TeamLeader should only be used in an MGXEnv" + if cmd["command_name"] == Command.PUBLISH_MESSAGE.cmd_name: + role.publish_message(Message(**cmd["args"])) + if cmd["command_name"] == Command.ASK_HUMAN.cmd_name: + role.rc.working_memory.add(Message(content=cmd["args"]["question"], role="assistant")) + human_rsp = await role.rc.env.ask_human(sent_from=role, **cmd["args"]) + role.rc.working_memory.add(Message(content=human_rsp, role="user")) + elif cmd["command_name"] == Command.REPLY_TO_HUMAN.cmd_name: + # TODO: consider if the message should go into memory + await role.rc.env.reply_to_human(sent_from=role, **cmd["args"]) + + +def run_internal_command(role: Role, cmd): + if cmd["command_name"] == Command.APPEND_TASK.cmd_name: + role.planner.plan.append_task(Task(**cmd["args"])) + elif cmd["command_name"] == Command.RESET_TASK.cmd_name: + role.planner.plan.reset_task(**cmd["args"]) + elif cmd["command_name"] == Command.REPLACE_TASK.cmd_name: + role.planner.plan.replace_task(Task(**cmd["args"])) + elif cmd["command_name"] == Command.FINISH_CURRENT_TASK.cmd_name: + role.planner.plan.current_task.update_task_result(task_result=role.task_result) + role.planner.plan.finish_current_task() + role.rc.working_memory.clear() + + +async def run_commands(role: Role, cmds): + print(*cmds, sep="\n") + for cmd in cmds: + await run_env_command(role, cmd) + run_internal_command(role, cmd) + + if role.planner.plan.is_plan_finished(): + role._set_state(-1)