apply data_analyst to role_zero

This commit is contained in:
lidanyang 2024-06-27 11:23:00 +08:00
parent a2f809263a
commit ddaecf12eb
5 changed files with 83 additions and 117 deletions

View file

@ -5,7 +5,7 @@ When presented a current task, tackle the task using the available commands.
Pay close attention to new user message, review the conversation history, use RoleZero.reply_to_human to respond to new user requirement.
Note:
1. If you keeping encountering errors, unexpected situation, or you are not sure of proceeding, use RoleZero.ask_human to ask for help.
2. Carefully review your progress at the current task, if your actions so far has not fulfilled the task instruction, you should continue with current task. Otherwise, finish current task.
2. Carefully review your progress at the current task, if your actions so far has not fulfilled the task instruction, you should continue with current task. Otherwise, finish current task by Plan.finish_current_task.
3. Each time you finish a task, use RoleZero.reply_to_human to report your progress.
"""

View file

@ -28,7 +28,10 @@ your code
```
"""
REFLECTION_SYSTEM_MSG = """You are an AI Python assistant. You will be given your previous implementation code of a task, runtime error results, and a hint to change the implementation appropriately. Write your full implementation."""
REFLECTION_SYSTEM_MSG = """
You are an AI Python assistant. You will be given your previous implementation code of a task, runtime error results, and a hint to change the implementation appropriately. Write your full implementation.
When occuring ModuleNotFoundError, always install the required package. And use Terminal tool if available.
"""
DEBUG_REFLECTION_EXAMPLE = '''
[previous impl]:

View file

@ -1,134 +1,88 @@
from __future__ import annotations
import json
from typing import Literal
from pydantic import Field
from pydantic import model_validator
from metagpt.actions import Action
from metagpt.actions.di.execute_nb_code import ExecuteNbCode
from metagpt.actions.di.write_analysis_code import WriteAnalysisCode
from metagpt.logs import logger
from metagpt.prompts.di.data_analyst import CMD_PROMPT
from metagpt.roles.di.data_interpreter import DataInterpreter
from metagpt.schema import Message, TaskResult
from metagpt.strategy.experience_retriever import KeywordExpRetriever
from metagpt.strategy.planner import Planner
from metagpt.strategy.thinking_command import (
Command,
prepare_command_prompt,
run_commands,
)
from metagpt.tools.tool_recommend import BM25ToolRecommender
from metagpt.utils.common import CodeParser
from metagpt.utils.report import ThoughtReporter
from metagpt.roles.di.role_zero import RoleZero
from metagpt.schema import TaskResult, Message
from metagpt.tools.tool_registry import register_tool
class DataAnalyst(DataInterpreter):
@register_tool(include_functions=["write_and_exec_code"])
class DataAnalyst(RoleZero):
name: str = "David"
profile: str = "DataAnalyst"
goal: str = "Take on any data-related tasks, such as data analysis, machine learning, deep learning, web browsing, web scraping, web searching, web deployment, terminal operation, git and github operation, etc."
react_mode: Literal["react"] = "react"
max_react_loop: int = 20 # used for react mode
tools: list[str] = ["Plan", "DataAnalyst", "RoleZero"]
custom_tools: list[str] = ["machine learning", "web scraping", "Terminal"]
use_reflection: bool = True
write_code: WriteAnalysisCode = Field(default_factory=WriteAnalysisCode, exclude=True)
execute_code: ExecuteNbCode = Field(default_factory=ExecuteNbCode, exclude=True)
task_result: TaskResult = None
available_commands: list[Command] = [
Command.APPEND_TASK,
Command.RESET_TASK,
Command.REPLACE_TASK,
Command.FINISH_CURRENT_TASK,
# Command.PUBLISH_MESSAGE,
Command.ASK_HUMAN,
Command.REPLY_TO_HUMAN,
# Command.PASS,
]
commands: list[dict] = [] # issued commands to be executed
user_requirement: str = ""
@model_validator(mode="after")
def set_plan_and_tool(self) -> "DataInterpreter":
# We force using this parameter for DataAnalyst
assert self.react_mode == "react"
assert self.auto_run
assert self.use_plan
def _update_tool_execution(self):
self.tool_execution_map = {
"Plan.append_task": self.planner.plan.append_task,
"Plan.reset_task": self.planner.plan.reset_task,
"Plan.replace_task": self.planner.plan.replace_task,
"DataAnalyst.write_and_exec_code": self.write_and_exec_code,
"RoleZero.ask_human": self.ask_human,
"RoleZero.reply_to_human": self.reply_to_human,
}
# Roughly the same part as DataInterpreter.set_plan_and_tool
self._set_react_mode(react_mode=self.react_mode, max_react_loop=self.max_react_loop, auto_run=self.auto_run)
if self.tools and not self.tool_recommender:
self.tool_recommender = BM25ToolRecommender(tools=self.tools)
self.set_actions([WriteAnalysisCode])
async def write_and_exec_code(self):
"""Write a code block for current task and execute it in an interactive notebook environment."""
counter = 0
success = False
# HACK: Init Planner, control it through dynamic thinking; Consider formalizing as a react mode
self.planner = Planner(goal="", working_memory=self.rc.working_memory, auto_run=True)
# plan info
plan_status = self.planner.get_plan_status()
return self
# tool info
if self.custom_tool_recommender:
plan = self.planner.plan
fix = ["Terminal"] if "Terminal" in self.custom_tools else None
tool_info = await self.custom_tool_recommender.get_recommended_tool_info(fix=fix, plan=plan)
else:
tool_info = ""
async def _think(self) -> bool:
"""Useful in 'react' mode. Use LLM to decide whether and what to do next."""
self._set_state(0)
example = ""
if not self.planner.plan.goal:
self.user_requirement = self.get_memories()[-1].content
self.planner.plan.goal = self.user_requirement
example = KeywordExpRetriever().retrieve(self.user_requirement)
while not success and counter < 3:
### write code ###
logger.info(f"ready to WriteAnalysisCode")
use_reflection = (counter > 0 and self.use_reflection) # only use reflection after the first trial
plan_status = self.planner.plan.model_dump(include=["goal", "tasks"])
# for task in plan_status["tasks"]:
# task.pop("code")
# task.pop("result")
prompt = CMD_PROMPT.format(
plan_status=plan_status,
example=example,
available_commands=prepare_command_prompt(self.available_commands),
)
context = self.llm.format_msg(self.working_memory.get() + [Message(content=prompt, role="user")])
# print(*context, sep="\n" + "*" * 5 + "\n")
async with ThoughtReporter(enable_llm_stream=True):
rsp = await self.llm.aask(context)
self.commands = json.loads(CodeParser.parse_code(block=None, lang='json', text=rsp))
self.rc.working_memory.add(Message(content=rsp, role="assistant"))
code = await self.write_code.run(
user_requirement=self.planner.plan.goal,
plan_status=plan_status,
tool_info=tool_info,
working_memory=self.rc.working_memory.get() if use_reflection else None,
use_reflection=use_reflection,
)
self.rc.working_memory.add(Message(content=code, role="assistant", cause_by=WriteAnalysisCode))
await run_commands(self, self.commands, self.rc.working_memory)
### execute code ###
result, success = await self.execute_code.run(code)
print(result)
return bool(self.rc.todo)
self.rc.working_memory.add(Message(content=result, role="user", cause_by=ExecuteNbCode))
async def _act(self) -> Message:
"""Useful in 'react' mode. Return a Message conforming to Role._act interface."""
logger.info(f"ready to take on task {self.planner.plan.current_task}")
### process execution result ###
counter += 1
self.task_result = TaskResult(code=code, result=result, is_success=success)
# TODO: Consider an appropriate location to insert task experience formally
experience = KeywordExpRetriever().retrieve(self.planner.plan.current_task.instruction, exp_type="task")
if experience and experience not in [msg.content for msg in self.rc.working_memory.get()]:
exp_msg = Message(content=experience, role="assistant")
self.rc.working_memory.add(exp_msg)
output = f"""
Code written:
{code}
Execution status:{'Success' if success else 'Failed'}
Execution result: {result}
"""
self.rc.working_memory.clear()
return output
code, result, is_success = await self._write_and_exec_code()
self.planner.plan.current_task.is_success = (
is_success # mark is_success, determine is_finished later in thinking
)
# FIXME: task result is always overwritten by the last act, whereas it can be made of of multiple acts
self.task_result = TaskResult(code=code, result=result, is_success=is_success)
return Message(content="Task completed", role="assistant", sent_from=self._setting, cause_by=WriteAnalysisCode)
async def _react(self) -> Message:
# NOTE: Diff 1: Each time landing here means observing news, set todo to allow news processing in _think
self._set_state(0)
actions_taken = 0
rsp = Message(content="No actions taken yet", cause_by=Action) # will be overwritten after Role _act
while actions_taken < self.rc.max_react_loop:
# NOTE: Diff 2: Keep observing within _react, news will go into memory, allowing adapting to new info
# add news from self._observe, the one called in self.run, consider removing when switching from working_memory to memory
self.working_memory.add_batch(self.rc.news)
await self._observe()
# add news from this self._observe, we need twice because _observe rewrites rc.news
self.working_memory.add_batch(self.rc.news)
# think
has_todo = await self._think()
if not has_todo:
break
# act
logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}")
rsp = await self._act()
actions_taken += 1
return rsp # return output from the last action
def _finish_current_task(self):
self.planner.current_task.update_task_result(self.task_result)
super()._finish_current_task()

View file

@ -41,8 +41,10 @@ class RoleZero(Role):
max_react_loop: int = 20 # used for react mode
# Tools
tools: list[str] = [] # Use special symbol ["<all>"] to indicate use of all registered tools
tools: list[str] = []
tool_recommender: ToolRecommender = None
custom_tools: list[str] = []
custom_tool_recommender: ToolRecommender = None
tool_execution_map: dict[str, Callable] = {}
special_tool_commands: list[str] = ["Plan.finish_current_task", "end"]
# Equipped with three basic tools by default for optional use
@ -68,6 +70,8 @@ class RoleZero(Role):
self._set_react_mode(react_mode=self.react_mode, max_react_loop=self.max_react_loop)
if self.tools and not self.tool_recommender:
self.tool_recommender = BM25ToolRecommender(tools=self.tools, force=True)
if self.custom_tools and not self.custom_tool_recommender:
self.custom_tool_recommender = BM25ToolRecommender(tools=self.custom_tools)
self.set_actions([RunCommand])
# HACK: Init Planner, control it through dynamic thinking; Consider formalizing as a react mode
@ -235,13 +239,16 @@ class RoleZero(Role):
if cmd["command_name"] == "Plan.finish_current_task" and not self.planner.plan.is_plan_finished():
# task_result = TaskResult(code=str(commands), result=outputs, is_success=is_success)
# self.planner.plan.current_task.update_task_result(task_result=task_result)
self.planner.plan.finish_current_task()
self._finish_current_task()
elif cmd["command_name"] == "end":
self._set_state(-1)
return is_special_cmd
def _finish_current_task(self):
self.planner.plan.finish_current_task()
def _get_plan_status(self) -> Tuple[str, str]:
plan_status = self.planner.plan.model_dump(include=["goal", "tasks"])
for task in plan_status["tasks"]:

View file

@ -101,7 +101,7 @@ class ToolRecommender(BaseModel):
return ranked_tools
async def get_recommended_tool_info(self, **kwargs) -> str:
async def get_recommended_tool_info(self, fix: list[str] = None, **kwargs) -> str:
"""
Wrap recommended tools with their info in a string, which can be used directly in a prompt.
"""
@ -109,6 +109,8 @@ class ToolRecommender(BaseModel):
if not recommended_tools:
return ""
tool_schemas = {tool.name: tool.schemas for tool in recommended_tools}
if fix:
tool_schemas.update({tool.name: tool.schemas for tool in self.tools.values() if tool.name in fix})
return TOOL_INFO_PROMPT.format(tool_schemas=tool_schemas)
async def recall_tools(self, context: str = "", plan: Plan = None, topk: int = 20) -> list[Tool]: