mv utils out of rz

This commit is contained in:
garylin2099 2025-03-25 13:08:36 +00:00
parent ad334450f4
commit b19dee3303
3 changed files with 205 additions and 166 deletions

View file

@ -25,6 +25,7 @@ from metagpt.tools.libs.terminal import Terminal
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.common import CodeParser, awrite
from metagpt.utils.report import EditorReporter
from metagpt.utils.role_zero_utils import get_plan_status
@register_tool(include_functions=["write_new_code"])
@ -117,7 +118,7 @@ class Engineer2(RoleZero):
"""
# If the path is not absolute, try to fix it with the editor's working directory.
path = self.editor._try_fix_path(path)
plan_status, _ = self._get_plan_status()
plan_status, _ = get_plan_status(planner=self.planner)
prompt = WRITE_CODE_PROMPT.format(
user_requirement=self.planner.plan.goal,
plan_status=plan_status,

View file

@ -5,35 +5,28 @@ import json
import re
import traceback
from datetime import datetime
from typing import Annotated, Callable, Dict, List, Literal, Optional, Tuple
from typing import Annotated, Callable, Literal, Optional, Tuple
from pydantic import Field, model_validator
from metagpt.actions import Action, UserRequirement
from metagpt.actions.di.run_command import RunCommand
from metagpt.actions.search_enhanced_qa import SearchEnhancedQA
from metagpt.const import IMAGES
from metagpt.exp_pool import exp_cache
from metagpt.exp_pool.context_builders import RoleZeroContextBuilder
from metagpt.exp_pool.serializers import RoleZeroSerializer
from metagpt.logs import logger
from metagpt.memory.role_zero_memory import RoleZeroLongTermMemory
from metagpt.prompts.di.role_zero import (
ASK_HUMAN_COMMAND,
ASK_HUMAN_GUIDANCE_FORMAT,
CMD_PROMPT,
DETECT_LANGUAGE_PROMPT,
END_COMMAND,
JSON_REPAIR_PROMPT,
QUICK_RESPONSE_SYSTEM_PROMPT,
QUICK_THINK_EXAMPLES,
QUICK_THINK_PROMPT,
QUICK_THINK_SYSTEM_PROMPT,
QUICK_THINK_TAG,
REGENERATE_PROMPT,
REPORT_TO_HUMAN_PROMPT,
ROLE_INSTRUCTION,
SUMMARY_PROBLEM_WHEN_DUPLICATE,
SUMMARY_PROMPT,
SYSTEM_PROMPT,
)
@ -45,13 +38,17 @@ from metagpt.tools.libs.browser import Browser
from metagpt.tools.libs.editor import Editor
from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.common import CodeParser, any_to_str, extract_and_encode_images
from metagpt.utils.repair_llm_raw_output import (
RepairType,
repair_escape_error,
repair_llm_raw_output,
)
from metagpt.utils.common import any_to_str
from metagpt.utils.report import ThoughtReporter
from metagpt.utils.role_zero_utils import (
check_duplicates,
format_terminal_output,
get_plan_status,
parse_browser_actions,
parse_commands,
parse_editor_result,
parse_images,
)
@register_tool(include_functions=["ask_human", "reply_to_human"])
@ -216,7 +213,7 @@ class RoleZero(Role):
example = self._retrieve_experience()
### 2. Plan Status ###
plan_status, current_task = self._get_plan_status()
plan_status, current_task = get_plan_status(planner=self.planner)
### 3. Tool/Command Info ###
tools = await self.tool_recommender.recommend_tools()
@ -242,9 +239,9 @@ class RoleZero(Role):
### Recent Observation ###
memory = self.rc.memory.get(self.memory_k)
memory = await self.parse_browser_actions(memory)
memory = await self.parse_editor_result(memory)
memory = self.parse_images(memory)
memory = await parse_browser_actions(memory, browser=self.browser)
memory = await parse_editor_result(memory)
memory = await parse_images(memory, llm=self.llm)
req = self.llm.format_msg(memory + [UserMessage(content=prompt)])
state_data = dict(
@ -255,7 +252,16 @@ class RoleZero(Role):
async with ThoughtReporter(enable_llm_stream=True) as reporter:
await reporter.async_report({"type": "react"})
self.command_rsp = await self.llm_cached_aask(req=req, system_msgs=[system_prompt], state_data=state_data)
self.command_rsp = await self._check_duplicates(req, self.command_rsp)
rsp_hist = [mem.content for mem in self.rc.memory.get()]
self.command_rsp = await check_duplicates(
req=req,
command_rsp=self.command_rsp,
rsp_hist=rsp_hist,
llm=self.llm,
respond_language=self.respond_language,
)
return True
@exp_cache(context_builder=RoleZeroContextBuilder(), serializer=RoleZeroSerializer())
@ -267,44 +273,6 @@ class RoleZero(Role):
"""
return await self.llm.aask(req, system_msgs=system_msgs)
async def parse_browser_actions(self, memory: list[Message]) -> list[Message]:
if not self.browser.is_empty_page:
pattern = re.compile(r"Command Browser\.(\w+) executed")
for index, msg in zip(range(len(memory), 0, -1), memory[::-1]):
if pattern.search(msg.content):
memory.insert(index, UserMessage(cause_by="browser", content=await self.browser.view()))
break
return memory
async def parse_editor_result(self, memory: list[Message], keep_latest_count=5) -> list[Message]:
"""Retain the latest result and remove outdated editor results."""
pattern = re.compile(r"Command Editor\.(\w+?) executed")
new_memory = []
i = 0
for msg in reversed(memory):
matches = pattern.findall(msg.content)
if matches:
i += 1
if i > keep_latest_count:
new_content = msg.content[: msg.content.find("Command Editor")]
new_content += "\n".join([f"Command Editor.{match} executed." for match in matches])
msg = UserMessage(content=new_content)
new_memory.append(msg)
# Reverse the new memory list so the latest message is at the end
new_memory.reverse()
return new_memory
def parse_images(self, memory: list[Message]) -> list[Message]:
if not self.llm.support_image_input():
return memory
for msg in memory:
if IMAGES in msg.metadata or msg.role != "user":
continue
images = extract_and_encode_images(msg.content)
if images:
msg.add_metadata(IMAGES, images)
return memory
def _get_prefix(self) -> str:
time_info = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return super()._get_prefix() + f" The current time is {time_info}."
@ -313,7 +281,9 @@ class RoleZero(Role):
if self.use_fixed_sop:
return await super()._act()
commands, ok, self.command_rsp = await self._parse_commands(self.command_rsp)
commands, ok, self.command_rsp = await parse_commands(
command_rsp=self.command_rsp, llm=self.llm, exclusive_tool_commands=self.exclusive_tool_commands
)
self.rc.memory.add(AIMessage(content=self.command_rsp))
if not ok:
error_msg = commands
@ -412,85 +382,6 @@ class RoleZero(Role):
return rsp_msg, intent_result
async def _check_duplicates(self, req: list[dict], command_rsp: str, check_window: int = 10):
past_rsp = [mem.content for mem in self.rc.memory.get(check_window)]
if command_rsp in past_rsp and '"command_name": "end"' not in command_rsp:
# Normal response with thought contents are highly unlikely to reproduce
# If an identical response is detected, it is a bad response, mostly due to LLM repeating generated content
# In this case, ask human for help and regenerate
# TODO: switch to llm_cached_aask
# Hard rule to ask human for help
if past_rsp.count(command_rsp) >= 3:
if '"command_name": "Plan.finish_current_task",' in command_rsp:
# Detect the duplicate of the 'Plan.finish_current_task' command, and use the 'end' command to finish the task.
logger.warning(f"Duplicate response detected: {command_rsp}")
return END_COMMAND
problem = await self.llm.aask(
req + [UserMessage(content=SUMMARY_PROBLEM_WHEN_DUPLICATE.format(language=self.respond_language))]
)
ASK_HUMAN_COMMAND[0]["args"]["question"] = ASK_HUMAN_GUIDANCE_FORMAT.format(problem=problem).strip()
ask_human_command = "```json\n" + json.dumps(ASK_HUMAN_COMMAND, indent=4, ensure_ascii=False) + "\n```"
return ask_human_command
# Try correction by self
logger.warning(f"Duplicate response detected: {command_rsp}")
regenerate_req = req + [UserMessage(content=REGENERATE_PROMPT)]
regenerate_req = self.llm.format_msg(regenerate_req)
command_rsp = await self.llm.aask(regenerate_req)
return command_rsp
async def _parse_commands(self, command_rsp) -> Tuple[List[Dict], bool]:
"""Retrieves commands from the Large Language Model (LLM).
This function attempts to retrieve a list of commands from the LLM by
processing the response (`self.command_rsp`). It handles potential errors
during parsing and LLM response formats.
Returns:
A tuple containing:
- A boolean flag indicating success (True) or failure (False).
"""
try:
commands = CodeParser.parse_code(block=None, lang="json", text=command_rsp)
if commands.endswith("]") and not commands.startswith("["):
commands = "[" + commands
commands = json.loads(repair_llm_raw_output(output=commands, req_keys=[None], repair_type=RepairType.JSON))
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON for: {command_rsp}. Trying to repair...")
commands = await self.llm.aask(
msg=JSON_REPAIR_PROMPT.format(json_data=command_rsp, json_decode_error=str(e))
)
try:
commands = json.loads(CodeParser.parse_code(block=None, lang="json", text=commands))
except json.JSONDecodeError:
# repair escape error of code and math
commands = CodeParser.parse_code(block=None, lang="json", text=command_rsp)
new_command = repair_escape_error(commands)
commands = json.loads(
repair_llm_raw_output(output=new_command, req_keys=[None], repair_type=RepairType.JSON)
)
except Exception as e:
tb = traceback.format_exc()
print(tb)
error_msg = str(e)
return error_msg, False, command_rsp
# 为了对LLM不按格式生成进行容错
if isinstance(commands, dict):
commands = commands["commands"] if "commands" in commands else [commands]
# Set the exclusive command flag to False.
command_flag = [command["command_name"] not in self.exclusive_tool_commands for command in commands]
if command_flag.count(False) > 1:
# Keep only the first exclusive command
index_of_first_exclusive = command_flag.index(False)
commands = commands[: index_of_first_exclusive + 1]
command_rsp = "```json\n" + json.dumps(commands, indent=4, ensure_ascii=False) + "\n```"
logger.info(
"exclusive command more than one in current command list. change the command list.\n" + command_rsp
)
return commands, True, command_rsp
async def _run_commands(self, commands) -> str:
outputs = []
for cmd in commands:
@ -552,36 +443,9 @@ class RoleZero(Role):
elif cmd["command_name"] == "Terminal.run_command":
tool_obj = self.tool_execution_map[cmd["command_name"]]
tool_output = await tool_obj(**cmd["args"])
if len(tool_output) <= 10:
command_output += (
f"\n[command]: {cmd['args']['cmd']} \n[command output] : {tool_output} (pay attention to this.)"
)
else:
command_output += f"\n[command]: {cmd['args']['cmd']} \n[command output] : {tool_output}"
command_output = format_terminal_output(cmd=cmd, raw_output=tool_output)
return command_output
def _get_plan_status(self) -> Tuple[str, str]:
plan_status = self.planner.plan.model_dump(include=["goal", "tasks"])
current_task = (
self.planner.plan.current_task.model_dump(exclude=["code", "result", "is_success"])
if self.planner.plan.current_task
else ""
)
# format plan status
# Example:
# [GOAL] create a 2048 game
# [TASK_ID 1] (finished) Create a Product Requirement Document (PRD) for the 2048 game. This task depends on tasks[]. [Assign to Alice]
# [TASK_ID 2] ( ) Design the system architecture for the 2048 game. This task depends on tasks[1]. [Assign to Bob]
formatted_plan_status = f"[GOAL] {plan_status['goal']}\n"
if len(plan_status["tasks"]) > 0:
formatted_plan_status += "[Plan]\n"
for task in plan_status["tasks"]:
formatted_plan_status += f"[TASK_ID {task['task_id']}] ({'finished' if task['is_finished'] else ' '}){task['instruction']} This task depends on tasks{task['dependent_task_ids']}. [Assign to {task['assignee']}]\n"
else:
formatted_plan_status += "No Plan \n"
return formatted_plan_status, current_task
def _retrieve_experience(self) -> str:
"""Default implementation of experience retrieval. Can be overwritten in subclasses."""
context = [str(msg) for msg in self.rc.memory.get(self.memory_k)]

View file

@ -0,0 +1,174 @@
from __future__ import annotations
import json
import re
import traceback
from typing import Tuple
from metagpt.const import IMAGES
from metagpt.logs import logger
from metagpt.prompts.di.role_zero import (
ASK_HUMAN_COMMAND,
ASK_HUMAN_GUIDANCE_FORMAT,
END_COMMAND,
JSON_REPAIR_PROMPT,
REGENERATE_PROMPT,
SUMMARY_PROBLEM_WHEN_DUPLICATE,
)
from metagpt.schema import Message, UserMessage
from metagpt.utils.common import CodeParser, extract_and_encode_images
from metagpt.utils.repair_llm_raw_output import (
RepairType,
repair_escape_error,
repair_llm_raw_output,
)
async def parse_browser_actions(memory: list[Message], browser) -> list[Message]:
if not browser.is_empty_page:
pattern = re.compile(r"Command Browser\.(\w+) executed")
for index, msg in zip(range(len(memory), 0, -1), memory[::-1]):
if pattern.search(msg.content):
memory.insert(index, UserMessage(cause_by="browser", content=await browser.view()))
break
return memory
async def parse_editor_result(memory: list[Message], keep_latest_count=5) -> list[Message]:
"""Retain the latest result and remove outdated editor results."""
pattern = re.compile(r"Command Editor\.(\w+?) executed")
new_memory = []
i = 0
for msg in reversed(memory):
matches = pattern.findall(msg.content)
if matches:
i += 1
if i > keep_latest_count:
new_content = msg.content[: msg.content.find("Command Editor")]
new_content += "\n".join([f"Command Editor.{match} executed." for match in matches])
msg = UserMessage(content=new_content)
new_memory.append(msg)
# Reverse the new memory list so the latest message is at the end
new_memory.reverse()
return new_memory
async def parse_images(memory: list[Message], llm) -> list[Message]:
if not llm.support_image_input():
return memory
for msg in memory:
if IMAGES in msg.metadata or msg.role != "user":
continue
images = extract_and_encode_images(msg.content)
if images:
msg.add_metadata(IMAGES, images)
return memory
async def check_duplicates(
req: list[dict], command_rsp: str, rsp_hist: list[str], llm, respond_language: str, check_window: int = 10
) -> str:
past_rsp = rsp_hist[-check_window:]
if command_rsp in past_rsp and '"command_name": "end"' not in command_rsp:
# Normal response with thought contents are highly unlikely to reproduce
# If an identical response is detected, it is a bad response, mostly due to LLM repeating generated content
# In this case, ask human for help and regenerate
# TODO: switch to llm_cached_aask
# Hard rule to ask human for help
if past_rsp.count(command_rsp) >= 3:
if '"command_name": "Plan.finish_current_task",' in command_rsp:
# Detect the duplicate of the 'Plan.finish_current_task' command, and use the 'end' command to finish the task.
logger.warning(f"Duplicate response detected: {command_rsp}")
return END_COMMAND
problem = await llm.aask(
req + [UserMessage(content=SUMMARY_PROBLEM_WHEN_DUPLICATE.format(language=respond_language))]
)
ASK_HUMAN_COMMAND[0]["args"]["question"] = ASK_HUMAN_GUIDANCE_FORMAT.format(problem=problem).strip()
ask_human_command = "```json\n" + json.dumps(ASK_HUMAN_COMMAND, indent=4, ensure_ascii=False) + "\n```"
return ask_human_command
# Try correction by self
logger.warning(f"Duplicate response detected: {command_rsp}")
regenerate_req = req + [UserMessage(content=REGENERATE_PROMPT)]
regenerate_req = llm.format_msg(regenerate_req)
command_rsp = await llm.aask(regenerate_req)
return command_rsp
async def parse_commands(command_rsp: str, llm, exclusive_tool_commands: list[str]) -> Tuple[list[dict], bool]:
"""Retrieves commands from the Large Language Model (LLM).
This function attempts to retrieve a list of commands from the LLM by
processing the response (`command_rsp`). It handles potential errors
during parsing and LLM response formats.
Returns:
A tuple containing:
- A boolean flag indicating success (True) or failure (False).
"""
try:
commands = CodeParser.parse_code(block=None, lang="json", text=command_rsp)
if commands.endswith("]") and not commands.startswith("["):
commands = "[" + commands
commands = json.loads(repair_llm_raw_output(output=commands, req_keys=[None], repair_type=RepairType.JSON))
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON for: {command_rsp}. Trying to repair...")
commands = await llm.aask(msg=JSON_REPAIR_PROMPT.format(json_data=command_rsp, json_decode_error=str(e)))
try:
commands = json.loads(CodeParser.parse_code(block=None, lang="json", text=commands))
except json.JSONDecodeError:
# repair escape error of code and math
commands = CodeParser.parse_code(block=None, lang="json", text=command_rsp)
new_command = repair_escape_error(commands)
commands = json.loads(
repair_llm_raw_output(output=new_command, req_keys=[None], repair_type=RepairType.JSON)
)
except Exception as e:
tb = traceback.format_exc()
print(tb)
error_msg = str(e)
return error_msg, False, command_rsp
# 为了对LLM不按格式生成进行容错
if isinstance(commands, dict):
commands = commands["commands"] if "commands" in commands else [commands]
# Set the exclusive command flag to False.
command_flag = [command["command_name"] not in exclusive_tool_commands for command in commands]
if command_flag.count(False) > 1:
# Keep only the first exclusive command
index_of_first_exclusive = command_flag.index(False)
commands = commands[: index_of_first_exclusive + 1]
command_rsp = "```json\n" + json.dumps(commands, indent=4, ensure_ascii=False) + "\n```"
logger.info("exclusive command more than one in current command list. change the command list.\n" + command_rsp)
return commands, True, command_rsp
def get_plan_status(planner) -> Tuple[str, str]:
plan_status = planner.plan.model_dump(include=["goal", "tasks"])
current_task = (
planner.plan.current_task.model_dump(exclude=["code", "result", "is_success"])
if planner.plan.current_task
else ""
)
# format plan status
# Example:
# [GOAL] create a 2048 game
# [TASK_ID 1] (finished) Create a Product Requirement Document (PRD) for the 2048 game. This task depends on tasks[]. [Assign to Alice]
# [TASK_ID 2] ( ) Design the system architecture for the 2048 game. This task depends on tasks[1]. [Assign to Bob]
formatted_plan_status = f"[GOAL] {plan_status['goal']}\n"
if len(plan_status["tasks"]) > 0:
formatted_plan_status += "[Plan]\n"
for task in plan_status["tasks"]:
formatted_plan_status += f"[TASK_ID {task['task_id']}] ({'finished' if task['is_finished'] else ' '}){task['instruction']} This task depends on tasks{task['dependent_task_ids']}. [Assign to {task['assignee']}]\n"
else:
formatted_plan_status += "No Plan \n"
return formatted_plan_status, current_task
def format_terminal_output(cmd: dict, raw_output: str) -> str:
if len(raw_output) <= 10:
command_output = f"\n[command]: {cmd['args']['cmd']} \n[command output] : {raw_output} (pay attention to this.)"
else:
command_output = f"\n[command]: {cmd['args']['cmd']} \n[command output] : {raw_output}"
return command_output