mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-05-11 16:52:37 +02:00
Merge branch 'data_analyst_ldy' into 'mgx_ops'
Data analyst ldy See merge request pub/MetaGPT!189
This commit is contained in:
commit
e2ce006d15
17 changed files with 376 additions and 198 deletions
|
|
@ -1,151 +1,109 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Literal
|
||||
import re
|
||||
from typing import List
|
||||
|
||||
from pydantic import model_validator
|
||||
from pydantic import Field, model_validator
|
||||
|
||||
from metagpt.actions import Action
|
||||
from metagpt.actions.di.execute_nb_code import ExecuteNbCode
|
||||
from metagpt.actions.di.write_analysis_code import WriteAnalysisCode
|
||||
from metagpt.logs import logger
|
||||
from metagpt.prompts.di.data_analyst import CMD_PROMPT
|
||||
from metagpt.prompts.di.role_zero import JSON_REPAIR_PROMPT
|
||||
from metagpt.roles.di.data_interpreter import DataInterpreter
|
||||
from metagpt.schema import Message, TaskResult
|
||||
from metagpt.strategy.experience_retriever import KeywordExpRetriever
|
||||
from metagpt.strategy.planner import Planner
|
||||
from metagpt.strategy.thinking_command import (
|
||||
Command,
|
||||
prepare_command_prompt,
|
||||
run_commands,
|
||||
)
|
||||
from metagpt.tools.tool_recommend import BM25ToolRecommender
|
||||
from metagpt.utils.common import CodeParser
|
||||
from metagpt.utils.report import ThoughtReporter
|
||||
from metagpt.utils.repair_llm_raw_output import repair_llm_raw_output, RepairType
|
||||
from metagpt.prompts.di.data_analyst import BROWSER_INSTRUCTION, TASK_TYPE_DESC, CODE_STATUS, BROWSER_INFO
|
||||
from metagpt.prompts.di.role_zero import ROLE_INSTRUCTION
|
||||
from metagpt.roles.di.role_zero import RoleZero
|
||||
from metagpt.schema import TaskResult, Message
|
||||
from metagpt.strategy.experience_retriever import ExpRetriever, KeywordExpRetriever
|
||||
from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender
|
||||
from metagpt.tools.tool_registry import register_tool
|
||||
|
||||
|
||||
class DataAnalyst(DataInterpreter):
|
||||
@register_tool(include_functions=["write_and_exec_code"])
|
||||
class DataAnalyst(RoleZero):
|
||||
name: str = "David"
|
||||
profile: str = "DataAnalyst"
|
||||
goal: str = "Take on any data-related tasks, such as data analysis, machine learning, deep learning, web browsing, web scraping, web searching, web deployment, terminal operation, git and github operation, etc."
|
||||
react_mode: Literal["react"] = "react"
|
||||
max_react_loop: int = 20 # used for react mode
|
||||
task_result: TaskResult = None
|
||||
available_commands: list[Command] = [
|
||||
Command.APPEND_TASK,
|
||||
Command.RESET_TASK,
|
||||
Command.REPLACE_TASK,
|
||||
Command.FINISH_CURRENT_TASK,
|
||||
# Command.PUBLISH_MESSAGE,
|
||||
Command.ASK_HUMAN,
|
||||
Command.REPLY_TO_HUMAN,
|
||||
# Command.PASS,
|
||||
]
|
||||
commands: list[dict] = [] # issued commands to be executed
|
||||
user_requirement: str = ""
|
||||
instruction: str = ROLE_INSTRUCTION + BROWSER_INSTRUCTION
|
||||
task_type_desc: str = TASK_TYPE_DESC
|
||||
|
||||
tools: list[str] = ["Plan", "DataAnalyst", "RoleZero", "Browser"]
|
||||
custom_tools: list[str] = ["machine learning", "web scraping", "Terminal"]
|
||||
custom_tool_recommender: ToolRecommender = None
|
||||
experience_retriever: ExpRetriever = KeywordExpRetriever()
|
||||
|
||||
use_reflection: bool = True
|
||||
write_code: WriteAnalysisCode = Field(default_factory=WriteAnalysisCode, exclude=True)
|
||||
execute_code: ExecuteNbCode = Field(default_factory=ExecuteNbCode, exclude=True)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def set_plan_and_tool(self) -> "DataInterpreter":
|
||||
# We force using this parameter for DataAnalyst
|
||||
assert self.react_mode == "react"
|
||||
assert self.auto_run
|
||||
assert self.use_plan
|
||||
def set_custom_tool(self):
|
||||
if self.custom_tools and not self.custom_tool_recommender:
|
||||
self.custom_tool_recommender = BM25ToolRecommender(tools=self.custom_tools)
|
||||
|
||||
# Roughly the same part as DataInterpreter.set_plan_and_tool
|
||||
self._set_react_mode(react_mode=self.react_mode, max_react_loop=self.max_react_loop, auto_run=self.auto_run)
|
||||
if self.tools and not self.tool_recommender:
|
||||
self.tool_recommender = BM25ToolRecommender(tools=self.tools)
|
||||
self.set_actions([WriteAnalysisCode])
|
||||
def _update_tool_execution(self):
|
||||
self.tool_execution_map.update({
|
||||
"DataAnalyst.write_and_exec_code": self.write_and_exec_code,
|
||||
})
|
||||
|
||||
# HACK: Init Planner, control it through dynamic thinking; Consider formalizing as a react mode
|
||||
self.planner = Planner(goal="", working_memory=self.rc.working_memory, auto_run=True)
|
||||
async def parse_browser_actions(self, memory: List[Message]) -> List[Message]:
|
||||
memory = await super().parse_browser_actions(memory)
|
||||
browser_actions = []
|
||||
for index, msg in enumerate(memory):
|
||||
if msg.cause_by == "browser":
|
||||
browser_url = re.search('URL: (.*?)\\n', msg.content).group(1)
|
||||
pattern = re.compile(r"Command Browser\.(\w+) executed")
|
||||
browser_actions.append({
|
||||
'command': pattern.match(memory[index - 1].content).group(1),
|
||||
'current url': browser_url
|
||||
})
|
||||
if browser_actions:
|
||||
browser_actions = BROWSER_INFO.format(browser_actions=browser_actions)
|
||||
self.rc.working_memory.add(Message(content=browser_actions, role="user", cause_by="browser"))
|
||||
return memory
|
||||
|
||||
return self
|
||||
async def write_and_exec_code(self):
|
||||
"""Write a code block for current task and execute it in an interactive notebook environment."""
|
||||
counter = 0
|
||||
success = False
|
||||
await self.execute_code.init_code()
|
||||
|
||||
async def _think(self) -> bool:
|
||||
"""Useful in 'react' mode. Use LLM to decide whether and what to do next."""
|
||||
self._set_state(0)
|
||||
example = ""
|
||||
if not self.planner.plan.goal:
|
||||
self.user_requirement = self.get_memories()[-1].content
|
||||
self.planner.plan.goal = self.user_requirement
|
||||
example = KeywordExpRetriever().retrieve(self.user_requirement)
|
||||
# plan info
|
||||
plan_status = self.planner.get_plan_status()
|
||||
|
||||
plan_status = self.planner.plan.model_dump(include=["goal", "tasks"])
|
||||
# for task in plan_status["tasks"]:
|
||||
# task.pop("code")
|
||||
# task.pop("result")
|
||||
prompt = CMD_PROMPT.format(
|
||||
plan_status=plan_status,
|
||||
example=example,
|
||||
available_commands=prepare_command_prompt(self.available_commands),
|
||||
)
|
||||
context = self.llm.format_msg(self.working_memory.get() + [Message(content=prompt, role="user")])
|
||||
# print(*context, sep="\n" + "*" * 5 + "\n")
|
||||
async with ThoughtReporter(enable_llm_stream=True):
|
||||
rsp = await self.llm.aask(context)
|
||||
# tool info
|
||||
if self.custom_tool_recommender:
|
||||
plan = self.planner.plan
|
||||
fixed = ["Terminal"] if "Terminal" in self.custom_tools else None
|
||||
tool_info = await self.custom_tool_recommender.get_recommended_tool_info(fixed=fixed, plan=plan)
|
||||
else:
|
||||
tool_info = ""
|
||||
|
||||
# 临时方案,待role zero的版本完成可将本注释内的代码直接替换掉
|
||||
# -------------开始---------------
|
||||
try:
|
||||
commands = CodeParser.parse_code(block=None, lang="json", text=rsp)
|
||||
commands = json.loads(repair_llm_raw_output(output=commands, req_keys=[None], repair_type=RepairType.JSON))
|
||||
except json.JSONDecodeError as e:
|
||||
commands = await self.llm.aask(msg=JSON_REPAIR_PROMPT.format(json_data=rsp))
|
||||
commands = json.loads(CodeParser.parse_code(block=None, lang="json", text=commands))
|
||||
except Exception as e:
|
||||
tb = traceback.format_exc()
|
||||
print(tb)
|
||||
while not success and counter < 3:
|
||||
### write code ###
|
||||
logger.info(f"ready to WriteAnalysisCode")
|
||||
use_reflection = (counter > 0 and self.use_reflection) # only use reflection after the first trial
|
||||
|
||||
# 为了对LLM不按格式生成进行容错
|
||||
if isinstance(commands, dict):
|
||||
commands = commands["commands"] if "commands" in commands else [commands]
|
||||
# -------------结束---------------
|
||||
code = await self.write_code.run(
|
||||
user_requirement=self.planner.plan.goal,
|
||||
plan_status=plan_status,
|
||||
tool_info=tool_info,
|
||||
working_memory=self.rc.working_memory.get(),
|
||||
use_reflection=use_reflection,
|
||||
)
|
||||
self.rc.working_memory.add(Message(content=code, role="assistant", cause_by=WriteAnalysisCode))
|
||||
|
||||
self.rc.working_memory.add(Message(content=rsp, role="assistant"))
|
||||
await run_commands(self, commands, self.rc.working_memory)
|
||||
return bool(self.rc.todo)
|
||||
### execute code ###
|
||||
result, success = await self.execute_code.run(code)
|
||||
print(result)
|
||||
|
||||
async def _act(self) -> Message:
|
||||
"""Useful in 'react' mode. Return a Message conforming to Role._act interface."""
|
||||
logger.info(f"ready to take on task {self.planner.plan.current_task}")
|
||||
self.rc.working_memory.add(Message(content=result, role="user", cause_by=ExecuteNbCode))
|
||||
|
||||
# TODO: Consider an appropriate location to insert task experience formally
|
||||
experience = KeywordExpRetriever().retrieve(self.planner.plan.current_task.instruction, exp_type="task")
|
||||
if experience and experience not in [msg.content for msg in self.rc.working_memory.get()]:
|
||||
exp_msg = Message(content=experience, role="assistant")
|
||||
self.rc.working_memory.add(exp_msg)
|
||||
### process execution result ###
|
||||
counter += 1
|
||||
if success:
|
||||
task_result = TaskResult(code=code, result=result, is_success=success)
|
||||
self.planner.current_task.update_task_result(task_result)
|
||||
|
||||
code, result, is_success = await self._write_and_exec_code()
|
||||
self.planner.plan.current_task.is_success = (
|
||||
is_success # mark is_success, determine is_finished later in thinking
|
||||
)
|
||||
|
||||
# FIXME: task result is always overwritten by the last act, whereas it can be made of of multiple acts
|
||||
self.task_result = TaskResult(code=code, result=result, is_success=is_success)
|
||||
return Message(content="Task completed", role="assistant", sent_from=self._setting, cause_by=WriteAnalysisCode)
|
||||
|
||||
async def _react(self) -> Message:
|
||||
# NOTE: Diff 1: Each time landing here means observing news, set todo to allow news processing in _think
|
||||
self._set_state(0)
|
||||
|
||||
actions_taken = 0
|
||||
rsp = Message(content="No actions taken yet", cause_by=Action) # will be overwritten after Role _act
|
||||
while actions_taken < self.rc.max_react_loop:
|
||||
# NOTE: Diff 2: Keep observing within _react, news will go into memory, allowing adapting to new info
|
||||
# add news from self._observe, the one called in self.run, consider removing when switching from working_memory to memory
|
||||
self.working_memory.add_batch(self.rc.news)
|
||||
await self._observe()
|
||||
# add news from this self._observe, we need twice because _observe rewrites rc.news
|
||||
self.working_memory.add_batch(self.rc.news)
|
||||
|
||||
# think
|
||||
has_todo = await self._think()
|
||||
if not has_todo:
|
||||
break
|
||||
# act
|
||||
logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}")
|
||||
rsp = await self._act()
|
||||
actions_taken += 1
|
||||
return rsp # return output from the last action
|
||||
status = 'Success' if success else 'Failed'
|
||||
output = CODE_STATUS.format(code=code, status=status, result=result)
|
||||
self.rc.working_memory.clear()
|
||||
return output
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ class RoleZero(Role):
|
|||
system_msg: list[str] = None # Use None to conform to the default value at llm.aask
|
||||
cmd_prompt: str = CMD_PROMPT
|
||||
instruction: str = ROLE_INSTRUCTION
|
||||
task_type_desc: str = None
|
||||
|
||||
# React Mode
|
||||
react_mode: Literal["react"] = "react"
|
||||
|
|
@ -148,14 +149,10 @@ class RoleZero(Role):
|
|||
example=example,
|
||||
available_commands=tool_info,
|
||||
instruction=self.instruction.strip(),
|
||||
task_type_desc=self.task_type_desc,
|
||||
)
|
||||
memory = self.rc.memory.get(self.memory_k)
|
||||
if not self.browser.is_empty_page:
|
||||
pattern = re.compile(r"Command Browser\.(\w+) executed")
|
||||
for index, msg in zip(range(len(memory), 0, -1), memory[::-1]):
|
||||
if pattern.match(msg.content):
|
||||
memory.insert(index, UserMessage(cause_by="browser", content=await self.browser.view()))
|
||||
break
|
||||
memory = await self.parse_browser_actions(memory)
|
||||
context = self.llm.format_msg(memory + [UserMessage(content=prompt)])
|
||||
# print(*context, sep="\n" + "*" * 5 + "\n")
|
||||
async with ThoughtReporter(enable_llm_stream=True) as reporter:
|
||||
|
|
@ -165,6 +162,15 @@ class RoleZero(Role):
|
|||
|
||||
return True
|
||||
|
||||
async def parse_browser_actions(self, memory: List[Message]) -> List[Message]:
|
||||
if not self.browser.is_empty_page:
|
||||
pattern = re.compile(r"Command Browser\.(\w+) executed")
|
||||
for index, msg in zip(range(len(memory), 0, -1), memory[::-1]):
|
||||
if pattern.match(msg.content):
|
||||
memory.insert(index, UserMessage(cause_by="browser", content=await self.browser.view()))
|
||||
break
|
||||
return memory
|
||||
|
||||
async def _act(self) -> Message:
|
||||
if self.use_fixed_sop:
|
||||
return await super()._act()
|
||||
|
|
@ -267,13 +273,14 @@ class RoleZero(Role):
|
|||
async def _run_commands(self, commands) -> str:
|
||||
outputs = []
|
||||
for cmd in commands:
|
||||
output = f"Command {cmd['command_name']} executed"
|
||||
# handle special command first
|
||||
if await self._run_special_command(cmd):
|
||||
outputs.append(output)
|
||||
continue
|
||||
# run command as specified by tool_execute_map
|
||||
if cmd["command_name"] in self.tool_execution_map:
|
||||
tool_obj = self.tool_execution_map[cmd["command_name"]]
|
||||
output = f"Command {cmd['command_name']} executed"
|
||||
try:
|
||||
if inspect.iscoroutinefunction(tool_obj):
|
||||
tool_output = await tool_obj(**cmd["args"])
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue