Merge branch 'mgx_ops' into basic_ability

This commit is contained in:
garylin2099 2024-07-19 10:13:48 +08:00
commit 4cc47366f8
94 changed files with 3333 additions and 1200 deletions

View file

@ -1,151 +1,111 @@
from __future__ import annotations
import json
from typing import Literal
import re
from typing import List
from pydantic import model_validator
from pydantic import Field, model_validator
from metagpt.actions import Action
from metagpt.actions.di.execute_nb_code import ExecuteNbCode
from metagpt.actions.di.write_analysis_code import WriteAnalysisCode
from metagpt.logs import logger
from metagpt.prompts.di.data_analyst import CMD_PROMPT
from metagpt.prompts.di.role_zero import JSON_REPAIR_PROMPT
from metagpt.roles.di.data_interpreter import DataInterpreter
from metagpt.schema import Message, TaskResult
from metagpt.strategy.experience_retriever import KeywordExpRetriever
from metagpt.strategy.planner import Planner
from metagpt.strategy.thinking_command import (
Command,
prepare_command_prompt,
run_commands,
)
from metagpt.tools.tool_recommend import BM25ToolRecommender
from metagpt.utils.common import CodeParser
from metagpt.utils.report import ThoughtReporter
from metagpt.utils.repair_llm_raw_output import repair_llm_raw_output, RepairType
from metagpt.prompts.di.data_analyst import BROWSER_INSTRUCTION, TASK_TYPE_DESC, CODE_STATUS, BROWSER_INFO
from metagpt.prompts.di.role_zero import ROLE_INSTRUCTION
from metagpt.roles.di.role_zero import RoleZero
from metagpt.schema import TaskResult, Message
from metagpt.strategy.experience_retriever import ExpRetriever, KeywordExpRetriever
from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender
from metagpt.tools.tool_registry import register_tool
class DataAnalyst(DataInterpreter):
@register_tool(include_functions=["write_and_exec_code"])
class DataAnalyst(RoleZero):
name: str = "David"
profile: str = "DataAnalyst"
goal: str = "Take on any data-related tasks, such as data analysis, machine learning, deep learning, web browsing, web scraping, web searching, web deployment, terminal operation, git and github operation, etc."
react_mode: Literal["react"] = "react"
max_react_loop: int = 20 # used for react mode
task_result: TaskResult = None
available_commands: list[Command] = [
Command.APPEND_TASK,
Command.RESET_TASK,
Command.REPLACE_TASK,
Command.FINISH_CURRENT_TASK,
# Command.PUBLISH_MESSAGE,
Command.ASK_HUMAN,
Command.REPLY_TO_HUMAN,
# Command.PASS,
]
commands: list[dict] = [] # issued commands to be executed
user_requirement: str = ""
instruction: str = ROLE_INSTRUCTION + BROWSER_INSTRUCTION
task_type_desc: str = TASK_TYPE_DESC
tools: list[str] = ["Plan", "DataAnalyst", "RoleZero", "Browser"]
custom_tools: list[str] = ["machine learning", "web scraping", "Terminal"]
custom_tool_recommender: ToolRecommender = None
experience_retriever: ExpRetriever = KeywordExpRetriever()
use_reflection: bool = True
write_code: WriteAnalysisCode = Field(default_factory=WriteAnalysisCode, exclude=True)
execute_code: ExecuteNbCode = Field(default_factory=ExecuteNbCode, exclude=True)
@model_validator(mode="after")
def set_plan_and_tool(self) -> "DataInterpreter":
# We force using this parameter for DataAnalyst
assert self.react_mode == "react"
assert self.auto_run
assert self.use_plan
def set_custom_tool(self):
if self.custom_tools and not self.custom_tool_recommender:
self.custom_tool_recommender = BM25ToolRecommender(tools=self.custom_tools)
# Roughly the same part as DataInterpreter.set_plan_and_tool
self._set_react_mode(react_mode=self.react_mode, max_react_loop=self.max_react_loop, auto_run=self.auto_run)
if self.tools and not self.tool_recommender:
self.tool_recommender = BM25ToolRecommender(tools=self.tools)
self.set_actions([WriteAnalysisCode])
def _update_tool_execution(self):
self.tool_execution_map.update({
"DataAnalyst.write_and_exec_code": self.write_and_exec_code,
})
# HACK: Init Planner, control it through dynamic thinking; Consider formalizing as a react mode
self.planner = Planner(goal="", working_memory=self.rc.working_memory, auto_run=True)
async def parse_browser_actions(self, memory: List[Message]) -> List[Message]:
memory = await super().parse_browser_actions(memory)
browser_actions = []
for index, msg in enumerate(memory):
if msg.cause_by == "browser":
browser_url = re.search('URL: (.*?)\\n', msg.content).group(1)
pattern = re.compile(r"Command Browser\.(\w+) executed")
browser_actions.append({
'command': pattern.match(memory[index - 1].content).group(1),
'current url': browser_url
})
if browser_actions:
browser_actions = BROWSER_INFO.format(browser_actions=browser_actions)
self.rc.working_memory.add(Message(content=browser_actions, role="user", cause_by="browser"))
return memory
return self
async def write_and_exec_code(self):
"""Write a code block for current task and execute it in an interactive notebook environment."""
counter = 0
success = False
await self.execute_code.init_code()
async def _think(self) -> bool:
"""Useful in 'react' mode. Use LLM to decide whether and what to do next."""
self._set_state(0)
example = ""
if not self.planner.plan.goal:
self.user_requirement = self.get_memories()[-1].content
self.planner.plan.goal = self.user_requirement
example = KeywordExpRetriever().retrieve(self.user_requirement)
# plan info
plan_status = self.planner.get_plan_status()
plan_status = self.planner.plan.model_dump(include=["goal", "tasks"])
# for task in plan_status["tasks"]:
# task.pop("code")
# task.pop("result")
prompt = CMD_PROMPT.format(
plan_status=plan_status,
example=example,
available_commands=prepare_command_prompt(self.available_commands),
)
context = self.llm.format_msg(self.working_memory.get() + [Message(content=prompt, role="user")])
# print(*context, sep="\n" + "*" * 5 + "\n")
async with ThoughtReporter(enable_llm_stream=True):
rsp = await self.llm.aask(context)
# tool info
if self.custom_tool_recommender:
plan = self.planner.plan
fixed = ["Terminal"] if "Terminal" in self.custom_tools else None
tool_info = await self.custom_tool_recommender.get_recommended_tool_info(fixed=fixed, plan=plan)
else:
tool_info = ""
# 临时方案待role zero的版本完成可将本注释内的代码直接替换掉
# -------------开始---------------
try:
commands = CodeParser.parse_code(block=None, lang="json", text=rsp)
commands = json.loads(repair_llm_raw_output(output=commands, req_keys=[None], repair_type=RepairType.JSON))
except json.JSONDecodeError as e:
commands = await self.llm.aask(msg=JSON_REPAIR_PROMPT.format(json_data=rsp))
commands = json.loads(CodeParser.parse_code(block=None, lang="json", text=commands))
except Exception as e:
tb = traceback.format_exc()
print(tb)
while not success and counter < 3:
### write code ###
logger.info(f"ready to WriteAnalysisCode")
use_reflection = (counter > 0 and self.use_reflection) # only use reflection after the first trial
# 为了对LLM不按格式生成进行容错
if isinstance(commands, dict):
commands = commands["commands"] if "commands" in commands else [commands]
# -------------结束---------------
code = await self.write_code.run(
user_requirement=self.planner.plan.goal,
plan_status=plan_status,
tool_info=tool_info,
working_memory=self.rc.working_memory.get(),
use_reflection=use_reflection,
)
self.rc.working_memory.add(Message(content=code, role="assistant", cause_by=WriteAnalysisCode))
self.rc.working_memory.add(Message(content=rsp, role="assistant"))
await run_commands(self, commands, self.rc.working_memory)
return bool(self.rc.todo)
### execute code ###
result, success = await self.execute_code.run(code)
print(result)
async def _act(self) -> Message:
"""Useful in 'react' mode. Return a Message conforming to Role._act interface."""
logger.info(f"ready to take on task {self.planner.plan.current_task}")
self.rc.working_memory.add(Message(content=result, role="user", cause_by=ExecuteNbCode))
# TODO: Consider an appropriate location to insert task experience formally
experience = KeywordExpRetriever().retrieve(self.planner.plan.current_task.instruction, exp_type="task")
if experience and experience not in [msg.content for msg in self.rc.working_memory.get()]:
exp_msg = Message(content=experience, role="assistant")
self.rc.working_memory.add(exp_msg)
### process execution result ###
counter += 1
if success:
task_result = TaskResult(code=code, result=result, is_success=success)
self.planner.current_task.update_task_result(task_result)
code, result, is_success = await self._write_and_exec_code()
self.planner.plan.current_task.is_success = (
is_success # mark is_success, determine is_finished later in thinking
)
# FIXME: task result is always overwritten by the last act, whereas it can be made of of multiple acts
self.task_result = TaskResult(code=code, result=result, is_success=is_success)
return Message(content="Task completed", role="assistant", sent_from=self._setting, cause_by=WriteAnalysisCode)
async def _react(self) -> Message:
# NOTE: Diff 1: Each time landing here means observing news, set todo to allow news processing in _think
self._set_state(0)
actions_taken = 0
rsp = Message(content="No actions taken yet", cause_by=Action) # will be overwritten after Role _act
while actions_taken < self.rc.max_react_loop:
# NOTE: Diff 2: Keep observing within _react, news will go into memory, allowing adapting to new info
# add news from self._observe, the one called in self.run, consider removing when switching from working_memory to memory
self.working_memory.add_batch(self.rc.news)
await self._observe()
# add news from this self._observe, we need twice because _observe rewrites rc.news
self.working_memory.add_batch(self.rc.news)
# think
has_todo = await self._think()
if not has_todo:
break
# act
logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}")
rsp = await self._act()
actions_taken += 1
return rsp # return output from the last action
status = 'Success' if success else 'Failed'
output = CODE_STATUS.format(code=code, status=status, result=result)
if success:
output += 'The code written has been executed successfully.'
self.rc.working_memory.clear()
return output

View file

@ -10,6 +10,9 @@ from pydantic import model_validator
from metagpt.actions import Action, UserRequirement
from metagpt.actions.di.run_command import RunCommand
from metagpt.exp_pool import exp_cache
from metagpt.exp_pool.context_builders import RoleZeroContextBuilder
from metagpt.exp_pool.serializers import RoleZeroSerializer
from metagpt.logs import logger
from metagpt.prompts.di.role_zero import (
CMD_PROMPT,
@ -41,6 +44,7 @@ class RoleZero(Role):
system_msg: list[str] = None # Use None to conform to the default value at llm.aask
cmd_prompt: str = CMD_PROMPT
instruction: str = ROLE_INSTRUCTION
task_type_desc: str = None
# React Mode
react_mode: Literal["react"] = "react"
@ -142,27 +146,50 @@ class RoleZero(Role):
tool_info = json.dumps({tool.name: tool.schemas for tool in tools})
### Make Decision Dynamically ###
instruction = self.instruction.strip()
prompt = self.cmd_prompt.format(
plan_status=plan_status,
current_task=current_task,
example=example,
available_commands=tool_info,
instruction=self.instruction.strip(),
task_type_desc=self.task_type_desc,
plan_status=plan_status,
current_task=current_task,
instruction=instruction,
)
memory = self.rc.memory.get(self.memory_k)
memory = await self.parse_browser_actions(memory)
req = self.llm.format_msg(memory + [UserMessage(content=prompt)])
async with ThoughtReporter(enable_llm_stream=True) as reporter:
await reporter.async_report({"type": "react"})
state_data = dict(
plan_status=plan_status,
current_task=current_task,
instruction=instruction,
)
self.command_rsp = await self.llm_cached_aask(req=req, system_msgs=self.system_msg, state_data=state_data)
self.rc.memory.add(AIMessage(content=self.command_rsp))
return True
@exp_cache(context_builder=RoleZeroContextBuilder(), serializer=RoleZeroSerializer())
async def llm_cached_aask(self, *, req: list[dict], system_msgs: list[str], **kwargs) -> str:
"""Use `exp_cache` to automatically manage experiences.
The `RoleZeroContextBuilder` attempts to add experiences to `req`.
The `RoleZeroSerializer` extracts essential parts of `req` for the experience pool, trimming lengthy entries to retain only necessary parts.
"""
return await self.llm.aask(req, system_msgs=system_msgs)
async def parse_browser_actions(self, memory: List[Message]) -> List[Message]:
if not self.browser.is_empty_page:
pattern = re.compile(r"Command Browser\.(\w+) executed")
for index, msg in zip(range(len(memory), 0, -1), memory[::-1]):
if pattern.match(msg.content):
memory.insert(index, UserMessage(cause_by="browser", content=await self.browser.view()))
break
context = self.llm.format_msg(memory + [UserMessage(content=prompt)])
# print(*context, sep="\n" + "*" * 5 + "\n")
async with ThoughtReporter(enable_llm_stream=True):
self.command_rsp = await self.llm.aask(context, system_msgs=self.system_msg)
self.rc.memory.add(AIMessage(content=self.command_rsp))
return True
return memory
async def _act(self) -> Message:
if self.use_fixed_sop:
@ -217,12 +244,13 @@ class RoleZero(Role):
# routing
memory = self.get_memories(k=4)
context = self.llm.format_msg(memory + [UserMessage(content=QUICK_THINK_PROMPT)])
# print(context)
rsp = await self.llm.aask(context)
if "yes" in rsp.lower():
# llm call with the original context
answer = await self.llm.aask(self.llm.format_msg(memory))
async with ThoughtReporter(enable_llm_stream=True) as reporter:
await reporter.async_report({"type": "quick"})
answer = await self.llm.aask(self.llm.format_msg(memory))
self.rc.memory.add(AIMessage(content=answer, cause_by=RunCommand))
await self.reply_to_human(content=answer)
rsp_msg = AIMessage(
@ -246,6 +274,8 @@ class RoleZero(Role):
"""
try:
commands = CodeParser.parse_code(block=None, lang="json", text=self.command_rsp)
if commands.endswith("]") and not commands.startswith("["):
commands = "[" + commands
commands = json.loads(repair_llm_raw_output(output=commands, req_keys=[None], repair_type=RepairType.JSON))
except json.JSONDecodeError:
logger.warning(f"Failed to parse JSON for: {self.command_rsp}. Trying to repair...")
@ -266,13 +296,15 @@ class RoleZero(Role):
async def _run_commands(self, commands) -> str:
outputs = []
for cmd in commands:
output = f"Command {cmd['command_name']} executed"
# handle special command first
if await self._run_special_command(cmd):
if self._is_special_command(cmd):
special_command_output = await self._run_special_command(cmd)
outputs.append(output + ":" + special_command_output)
continue
# run command as specified by tool_execute_map
if cmd["command_name"] in self.tool_execution_map:
tool_obj = self.tool_execution_map[cmd["command_name"]]
output = f"Command {cmd['command_name']} executed"
try:
if inspect.iscoroutinefunction(tool_obj):
tool_output = await tool_obj(**cmd["args"])
@ -293,19 +325,24 @@ class RoleZero(Role):
return outputs
async def _run_special_command(self, cmd) -> bool:
def _is_special_command(self, cmd) -> bool:
return cmd["command_name"] in self.special_tool_commands
async def _run_special_command(self, cmd) -> str:
"""command requiring special check or parsing"""
is_special_cmd = cmd["command_name"] in self.special_tool_commands
command_output = ""
if cmd["command_name"] == "Plan.finish_current_task" and not self.planner.plan.is_plan_finished():
# task_result = TaskResult(code=str(commands), result=outputs, is_success=is_success)
# self.planner.plan.current_task.update_task_result(task_result=task_result)
self.planner.plan.finish_current_task()
command_output = "Current task is finished. "
elif cmd["command_name"] == "end":
self._set_state(-1)
command_output = "Everything Done"
return is_special_cmd
return command_output
def _get_plan_status(self) -> Tuple[str, str]:
plan_status = self.planner.plan.model_dump(include=["goal", "tasks"])

View file

@ -1,5 +1,4 @@
import json
import os
from pydantic import Field
@ -10,6 +9,7 @@ from metagpt.prompts.di.swe_agent import (
SWE_AGENT_SYSTEM_TEMPLATE,
)
from metagpt.roles.di.role_zero import RoleZero
from metagpt.tools.libs.git import git_create_pull, git_push
from metagpt.tools.libs.terminal import Bash
@ -17,50 +17,46 @@ class SWEAgent(RoleZero):
name: str = "Swen"
profile: str = "Issue Solver"
goal: str = "Resolve GitHub issue"
_bash_window_size: int = 100
_system_msg: str = SWE_AGENT_SYSTEM_TEMPLATE
system_msg: list[str] = [_system_msg.format(WINDOW=_bash_window_size)]
system_msg: str = [SWE_AGENT_SYSTEM_TEMPLATE]
_instruction: str = NEXT_STEP_TEMPLATE
tools: list[str] = ["Bash", "Browser:goto,scroll"]
tools: list[str] = [
"Bash",
"Browser:goto,scroll",
"RoleZero",
"git_push",
"git_create_pull",
]
terminal: Bash = Field(default_factory=Bash, exclude=True)
output_diff: str = ""
max_react_loop: int = 40
run_eval: bool = False
async def _think(self) -> bool:
self._update_system_msg()
self._format_instruction()
await self._format_instruction()
res = await super()._think()
if self.run_eval:
await self._parse_commands_for_eval()
return res
def _update_system_msg(self):
"""
Sets the system message for the SWE agent.
def _update_tool_execution(self):
self.tool_execution_map.update(
{
"Bash.run": self.terminal.run,
"git_push": git_push,
"git_create_pull": git_create_pull,
}
)
Sets the `_bash_window_size` from the environment variable `WINDOW` if it exists.
Formats the `_system_msg` template with the current `_bash_window_size`.
"""
if os.getenv("WINDOW"):
self._bash_window_size = int(os.getenv("WINDOW"))
self.system_msg = [self._system_msg.format(WINDOW=self._bash_window_size)]
def _format_instruction(self):
async def _format_instruction(self):
"""
Formats the instruction message for the SWE agent.
Runs the "state" command in the terminal, parses its output as JSON,
and uses it to format the `_instruction` template.
"""
state_output = self.terminal.run("state")
state_output = await self.terminal.run("state")
bash_state = json.loads(state_output)
self.instruction = self._instruction.format(
WINDOW=self._bash_window_size, examples=MINIMAL_EXAMPLE, **bash_state
).strip()
return self.instruction
self.instruction = self._instruction.format(**bash_state).strip()
async def _parse_commands_for_eval(self):
"""
@ -81,7 +77,7 @@ class SWEAgent(RoleZero):
if "end" != cmd.get("command_name", ""):
return
try:
diff_output = self.terminal.run("git diff --cached")
diff_output = await self.terminal.run("git diff --cached")
clear_diff = extract_patch(diff_output)
logger.info(f"Diff output: \n{clear_diff}")
if clear_diff:
@ -90,8 +86,5 @@ class SWEAgent(RoleZero):
except Exception as e:
logger.error(f"Error during submission: {e}")
def _update_tool_execution(self):
self.tool_execution_map.update({"Bash.run": self.terminal.run})
def _retrieve_experience(self) -> str:
return MINIMAL_EXAMPLE

View file

@ -15,7 +15,7 @@ from metagpt.tools.tool_registry import register_tool
@register_tool(include_functions=["publish_team_message"])
class TeamLeader(RoleZero):
name: str = "Tim"
name: str = "Mike"
profile: str = "Team Leader"
goal: str = "Manage a team to assist users"
system_msg: list[str] = [SYSTEM_PROMPT]