add data analyst, explore agent controlling own plan by thinking

This commit is contained in:
yzlin 2024-04-29 18:47:29 +08:00
parent f134126a90
commit 5447399ecb
7 changed files with 220 additions and 49 deletions

View file

@ -0,0 +1,40 @@
CMD_PROMPT = """
# Data Structure
class Task(BaseModel):
task_id: str = ""
dependent_task_ids: list[str] = []
instruction: str = ""
task_type: str = ""
assignee: str = "David"
# Available Commands
{available_commands}
# Current Plan
{plan_status}
# Example
{example}
# Instructions
Based on the context, write a plan or modify an existing plan to achieve the goal. A plan consists of one to 3 tasks.
If plan is created, you should track the progress and update the plan accordingly, such as finish_current_task, append_task, reset_task, replace_task, etc.
Pay close attention to new user message, review the conversation history, use reply_to_human to respond to new user requirement.
Note:
1. If you keeping encountering errors, unexpected situation, or you are not sure of proceeding, use ask_human to ask for help.
You may use any of the available commands to create a plan or update the plan. You may output mutiple commands, they will be executed sequentially.
If you finish current task, you will automatically take the next task in the existing plan, use finish_task, DON'T append a new task.
# Your commands in a json array, in the following output format, always output a json array, if there is nothing to do, use the pass command:
Some text indicating your thoughts, such as how you should update the plan status, respond to inquiry, or seek for help. Then a json array of commands.
```json
[
{{
"command_name": str,
"args": {{"arg_name": arg_value, ...}}
}},
...
]
```
"""

View file

@ -1,7 +1,9 @@
INTERPRETER_SYSTEM_MSG = """
As a data scientist, you need to help user to achieve their goal step by step in a continuous Jupyter notebook.
Since it is a notebook environment, don't use asyncio.run. Instead, use await if you need to call an async function.
If you want to use shell command such as git clone, pip install packages, navigate folders, read file, etc., use Terminal tool if available before trying ! in notebook block.
If you want to use shell command such as git clone, pip install packages, navigate folders, read file, etc., use Terminal tool if available. DON'T use ! in notebook block.
Don't write all codes in one response, each time, just write code for one step or current task.
While some concise thoughts are helpful, code is absolutely required. Always output one and only one code block in your response.
"""
STRUCTUAL_PROMPT = """

View file

@ -0,0 +1,114 @@
from __future__ import annotations
import json
from typing import Literal
from pydantic import model_validator
from metagpt.actions import Action
from metagpt.actions.di.write_analysis_code import WriteAnalysisCode
from metagpt.logs import logger
from metagpt.prompts.di.data_analyst import CMD_PROMPT
from metagpt.roles.di.data_interpreter import DataInterpreter
from metagpt.schema import Message, TaskResult
from metagpt.strategy.planner import Planner
from metagpt.strategy.thinking_command import (
Command,
prepare_command_prompt,
run_commands,
)
from metagpt.tools.tool_recommend import BM25ToolRecommender
from metagpt.utils.common import CodeParser
class DataAnalyst(DataInterpreter):
name: str = "David"
profile: str = "DataAnalyst"
react_mode: Literal["react"] = "react"
max_react_loop: int = 20 # used for react mode
task_result: TaskResult = None
available_commands: list[Command] = [
Command.APPEND_TASK,
Command.RESET_TASK,
Command.REPLACE_TASK,
Command.FINISH_CURRENT_TASK,
# Command.PUBLISH_MESSAGE,
Command.ASK_HUMAN,
Command.REPLY_TO_HUMAN,
# Command.PASS,
]
commands: list[dict] = [] # issued commands to be executed
@model_validator(mode="after")
def set_plan_and_tool(self) -> "DataInterpreter":
# We force using this parameter for DataAnalyst
assert self.react_mode == "react"
assert self.auto_run
assert self.use_plan
# Roughly the same part as DataInterpreter.set_plan_and_tool
self._set_react_mode(react_mode=self.react_mode, max_react_loop=self.max_react_loop, auto_run=self.auto_run)
if self.tools and not self.tool_recommender:
self.tool_recommender = BM25ToolRecommender(tools=self.tools)
self.set_actions([WriteAnalysisCode])
self._set_state(0)
# HACK: Init Planner, control it through dynamic thinking; Consider formalizing as a react mode
self.planner = Planner(goal=self.goal, working_memory=self.rc.working_memory, auto_run=True)
return self
async def _think(self) -> bool:
"""Useful in 'react' mode. Use LLM to decide whether and what to do next."""
self._set_state(0)
if not self.planner.plan.goal:
self.user_requirement = self.get_memories()[-1].content
self.planner.plan.goal = self.user_requirement
else:
self.working_memory.add_batch(self.rc.news)
plan_status = self.planner.plan.model_dump(include=["goal", "tasks"])
for task in plan_status["tasks"]:
task.pop("code")
task.pop("result")
example = ""
prompt = CMD_PROMPT.format(
plan_status=plan_status,
example=example,
available_commands=prepare_command_prompt(self.available_commands),
)
context = self.llm.format_msg(self.working_memory.get() + [Message(content=prompt, role="user")])
rsp = await self.llm.aask(context)
self.commands = json.loads(CodeParser.parse_code(block=None, text=rsp))
self.rc.memory.add(Message(content=rsp, role="assistant"))
await run_commands(self, self.commands)
return bool(self.rc.todo)
async def _act(self) -> Message:
"""Useful in 'react' mode. Return a Message conforming to Role._act interface."""
logger.info(f"ready to take on task {self.planner.plan.current_task}")
code, result, is_success = await self._write_and_exec_code()
self.planner.plan.current_task.is_success = (
is_success # mark is_success, determine is_finished later in thinking
)
self.task_result = TaskResult(code=code, result=result, is_success=is_success)
return Message(content="Task completed", role="assistant", sent_from=self._setting, cause_by=WriteAnalysisCode)
async def _react(self) -> Message:
actions_taken = 0
rsp = Message(content="No actions taken yet", cause_by=Action) # will be overwritten after Role _act
while actions_taken < self.rc.max_react_loop:
# NOTE: difference here, keep observing within react
await self._observe()
# think
has_todo = await self._think()
if not has_todo:
break
# act
logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}")
rsp = await self._act()
actions_taken += 1
return rsp # return output from the last action

View file

@ -5,7 +5,7 @@ from typing import Literal
from pydantic import Field, model_validator
from metagpt.actions.di.ask_review import ReviewConst
# from metagpt.actions.di.ask_review import ReviewConst
from metagpt.actions.di.execute_nb_code import ExecuteNbCode
from metagpt.actions.di.write_analysis_code import CheckData, WriteAnalysisCode
from metagpt.logs import logger
@ -43,6 +43,7 @@ class DataInterpreter(Role):
tool_recommender: ToolRecommender = None
react_mode: Literal["plan_and_act", "react"] = "plan_and_act"
max_react_loop: int = 10 # used for react mode
user_requirement: str = ""
@model_validator(mode="after")
def set_plan_and_tool(self) -> "Interpreter":
@ -62,7 +63,7 @@ class DataInterpreter(Role):
async def _think(self) -> bool:
"""Useful in 'react' mode. Use LLM to decide whether and what to do next."""
user_requirement = self.get_memories()[-1].content
self.user_requirement = self.get_memories()[-1].content
context = self.working_memory.get()
if not context:
@ -71,7 +72,7 @@ class DataInterpreter(Role):
self._set_state(0)
return True
prompt = REACT_THINK_PROMPT.format(user_requirement=user_requirement, context=context)
prompt = REACT_THINK_PROMPT.format(user_requirement=self.user_requirement, context=context)
rsp = await self.llm.aask(prompt)
rsp_dict = json.loads(CodeParser.parse_code(block=None, text=rsp))
self.working_memory.add(Message(content=rsp_dict["thoughts"], role="assistant"))
@ -83,7 +84,7 @@ class DataInterpreter(Role):
async def _act(self) -> Message:
"""Useful in 'react' mode. Return a Message conforming to Role._act interface."""
code, _, _ = await self._write_and_exec_code()
return Message(content=code, role="assistant", cause_by=WriteAnalysisCode)
return Message(content=code, role="assistant", sent_from=self._setting, cause_by=WriteAnalysisCode)
async def _plan_and_act(self) -> Message:
self._set_state(0)
@ -136,11 +137,11 @@ class DataInterpreter(Role):
### process execution result ###
counter += 1
if not success and counter >= max_retry:
logger.info("coding failed!")
review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER)
if ReviewConst.CHANGE_WORDS[0] in review:
counter = 0 # redo the task again with help of human suggestions
# if not success and counter >= max_retry:
# logger.info("coding failed!")
# review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER)
# if ReviewConst.CHANGE_WORDS[0] in review:
# counter = 0 # redo the task again with help of human suggestions
return code, result, success
@ -154,10 +155,8 @@ class DataInterpreter(Role):
logger.info(f"ready to {todo.name}")
use_reflection = counter > 0 and self.use_reflection # only use reflection after the first trial
user_requirement = self.get_memories()[-1].content
code = await todo.run(
user_requirement=user_requirement,
user_requirement=self.user_requirement,
plan_status=plan_status,
tool_info=tool_info,
working_memory=self.working_memory.get(),

View file

@ -5,17 +5,20 @@ import json
from pydantic import model_validator
from metagpt.actions.di.run_command import RunCommand
from metagpt.environment.mgx.mgx_env import MGXEnv
from metagpt.prompts.di.team_leader import (
CMD_PROMPT,
FINISH_CURRENT_TASK_CMD,
SYSTEM_PROMPT,
)
from metagpt.roles import Role
from metagpt.schema import Message, Task, TaskResult
from metagpt.schema import Message, TaskResult
from metagpt.strategy.experience_retriever import SimpleExpRetriever
from metagpt.strategy.planner import Planner
from metagpt.strategy.thinking_command import Command, prepare_command_prompt
from metagpt.strategy.thinking_command import (
Command,
prepare_command_prompt,
run_commands,
)
from metagpt.utils.common import CodeParser
@ -37,39 +40,12 @@ class TeamLeader(Role):
@model_validator(mode="after")
def set_plan(self) -> "TeamLeader":
self.rc.working_memory = (
self.rc.memory
) # TeamLeader does not need working memory, all messages should go into memory
self.planner = Planner(goal=self.goal, working_memory=self.rc.working_memory, auto_run=True)
return self
async def _run_env_command(self, cmd):
assert isinstance(self.rc.env, MGXEnv), "TeamLeader should only be used in an MGXEnv"
if cmd["command_name"] == Command.PUBLISH_MESSAGE.cmd_name:
self.publish_message(Message(**cmd["args"]))
elif cmd["command_name"] == Command.ASK_HUMAN.cmd_name:
await self.rc.env.ask_human(sent_from=self, **cmd["args"])
elif cmd["command_name"] == Command.REPLY_TO_HUMAN.cmd_name:
await self.rc.env.reply_to_human(sent_from=self, **cmd["args"])
def _run_internal_command(self, cmd):
if cmd["command_name"] == Command.APPEND_TASK.cmd_name:
self.planner.plan.append_task(Task(**cmd["args"]))
elif cmd["command_name"] == Command.RESET_TASK.cmd_name:
self.planner.plan.reset_task(**cmd["args"])
elif cmd["command_name"] == Command.REPLACE_TASK.cmd_name:
self.planner.plan.replace_task(Task(**cmd["args"]))
elif cmd["command_name"] == Command.FINISH_CURRENT_TASK.cmd_name:
self.planner.plan.current_task.update_task_result(task_result=self.task_result)
self.planner.plan.finish_current_task()
self.rc.working_memory.clear()
async def run_commands(self, cmds):
print(*cmds, sep="\n")
for cmd in cmds:
await self._run_env_command(cmd)
self._run_internal_command(cmd)
if self.planner.plan.is_plan_finished():
self._set_state(-1)
async def _think(self) -> bool:
"""Useful in 'react' mode. Use LLM to decide whether and what to do next."""
@ -104,7 +80,7 @@ class TeamLeader(Role):
async def _act(self) -> Message:
"""Useful in 'react' mode. Return a Message conforming to Role._act interface."""
await self.run_commands(self.commands)
await run_commands(self, self.commands)
self.task_result = TaskResult(result="Success", is_success=True)
msg = Message(content="Commands executed", send_to="no one") # a dummy message to conform to the interface
self.rc.memory.add(msg)

View file

@ -416,11 +416,11 @@ class Role(SerializationMixin, ContextMixin, BaseModel):
news = self.rc.msg_buffer.pop_all()
# Store the read messages in your own memory to prevent duplicate processing.
old_messages = [] if ignore_memory else self.rc.memory.get()
self.rc.memory.add_batch(news)
# Filter out messages of interest.
# Filter in messages of interest.
self.rc.news = [
n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
]
self.rc.memory.add_batch(self.rc.news) # only save messages of interest into memory
self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None # record the latest observed msg
# Design Rules:

View file

@ -2,6 +2,10 @@ from enum import Enum
from pydantic import BaseModel
from metagpt.environment.mgx.mgx_env import MGXEnv
from metagpt.roles import Role
from metagpt.schema import Message, Task
class CommandDef(BaseModel):
name: str
@ -66,3 +70,39 @@ def prepare_command_prompt(commands: list[Command]) -> str:
for i, command in enumerate(commands):
command_prompt += f"{i+1}. {command.value.signature}:\n{command.value.desc}\n\n"
return command_prompt
async def run_env_command(role: Role, cmd):
assert isinstance(role.rc.env, MGXEnv), "TeamLeader should only be used in an MGXEnv"
if cmd["command_name"] == Command.PUBLISH_MESSAGE.cmd_name:
role.publish_message(Message(**cmd["args"]))
if cmd["command_name"] == Command.ASK_HUMAN.cmd_name:
role.rc.working_memory.add(Message(content=cmd["args"]["question"], role="assistant"))
human_rsp = await role.rc.env.ask_human(sent_from=role, **cmd["args"])
role.rc.working_memory.add(Message(content=human_rsp, role="user"))
elif cmd["command_name"] == Command.REPLY_TO_HUMAN.cmd_name:
# TODO: consider if the message should go into memory
await role.rc.env.reply_to_human(sent_from=role, **cmd["args"])
def run_internal_command(role: Role, cmd):
if cmd["command_name"] == Command.APPEND_TASK.cmd_name:
role.planner.plan.append_task(Task(**cmd["args"]))
elif cmd["command_name"] == Command.RESET_TASK.cmd_name:
role.planner.plan.reset_task(**cmd["args"])
elif cmd["command_name"] == Command.REPLACE_TASK.cmd_name:
role.planner.plan.replace_task(Task(**cmd["args"]))
elif cmd["command_name"] == Command.FINISH_CURRENT_TASK.cmd_name:
role.planner.plan.current_task.update_task_result(task_result=role.task_result)
role.planner.plan.finish_current_task()
role.rc.working_memory.clear()
async def run_commands(role: Role, cmds):
print(*cmds, sep="\n")
for cmd in cmds:
await run_env_command(role, cmd)
run_internal_command(role, cmd)
if role.planner.plan.is_plan_finished():
role._set_state(-1)