disentangle planner and tool module, optimize tool module, add react mode

This commit is contained in:
yzlin 2024-03-07 21:22:44 +08:00
parent 0a2273c7a0
commit 0116de01b9
20 changed files with 554 additions and 354 deletions

View file

@ -7,168 +7,57 @@
from __future__ import annotations
import json
from typing import Tuple
from metagpt.actions import Action
from metagpt.logs import logger
from metagpt.prompts.mi.ml_action import MODEL_TRAIN_EXAMPLE, USE_ML_TOOLS_EXAMPLE
from metagpt.prompts.mi.write_analysis_code import (
CHECK_DATA_PROMPT,
DEBUG_REFLECTION_EXAMPLE,
INTERPRETER_SYSTEM_MSG,
REFLECTION_PROMPT,
REFLECTION_SYSTEM_MSG,
STRUCTUAL_PROMPT,
TOOL_RECOMMENDATION_PROMPT,
)
from metagpt.schema import Message, Plan
from metagpt.tools import TOOL_REGISTRY
from metagpt.tools.tool_registry import validate_tool_names
from metagpt.tools.tool_type import ToolType
from metagpt.utils.common import CodeParser, process_message, remove_comments
class WriteCodeWithTools(Action):
"""Write code with help of local available tools. Choose tools first, then generate code to use the tools"""
use_tools: bool = True
# selected tools to choose from, listed by their names. An empty list means selection from all tools.
selected_tools: list[str] = []
def _get_tools_by_type(self, tool_type: str) -> dict:
"""
Retreive tools by tool type from registry, but filtered by pre-selected tool list
Args:
tool_type (str): Tool type to retrieve from the registry
Returns:
dict: A dict of tool name to Tool object, representing available tools under the type
"""
candidate_tools = TOOL_REGISTRY.get_tools_by_type(tool_type)
if self.selected_tools:
candidate_tool_names = set(self.selected_tools) & candidate_tools.keys()
candidate_tools = {tool_name: candidate_tools[tool_name] for tool_name in candidate_tool_names}
return candidate_tools
async def _recommend_tool(
self,
task: str,
available_tools: dict,
) -> dict:
"""
Recommend tools for the specified task.
Args:
task (str): the task to recommend tools for
available_tools (dict): the available tools description
Returns:
dict: schemas of recommended tools for the specified task
"""
prompt = TOOL_RECOMMENDATION_PROMPT.format(
current_task=task,
available_tools=available_tools,
)
rsp = await self._aask(prompt)
rsp = CodeParser.parse_code(block=None, text=rsp)
recommend_tools = json.loads(rsp)
logger.info(f"Recommended tools: \n{recommend_tools}")
# Parses and validates the recommended tools, for LLM might hallucinate and recommend non-existing tools
valid_tools = validate_tool_names(recommend_tools, return_tool_object=True)
tool_schemas = {tool.name: tool.schemas for tool in valid_tools}
return tool_schemas
async def _prepare_tools(self, plan: Plan) -> Tuple[dict, str, str]:
"""Prepare tool schemas and usage instructions according to current task
Args:
plan (Plan): The overall plan containing task information.
Returns:
Tuple[dict, str, str]: A tool schemas ({tool_name: tool_schema_dict}), a usage prompt for the type of tools selected, and examples of using the tools
"""
if not self.use_tools:
return {}, "", ""
# find tool type from task type through exact match, can extend to retrieval in the future
tool_type = plan.current_task.task_type
# prepare tool-type-specific instruction
tool_type_usage_prompt = (
TOOL_REGISTRY.get_tool_type(tool_type).usage_prompt if TOOL_REGISTRY.has_tool_type(tool_type) else ""
)
# ML-specific tool usage examples
examples = ""
if plan.current_task.task_type in [
ToolType.DATA_PREPROCESS.type_name,
ToolType.FEATURE_ENGINEERING.type_name,
]:
examples = USE_ML_TOOLS_EXAMPLE
elif plan.current_task.task_type in [ToolType.MODEL_TRAIN.type_name]:
examples = MODEL_TRAIN_EXAMPLE
# prepare schemas of available tools
tool_schemas = {}
available_tools = self._get_tools_by_type(tool_type)
if available_tools:
available_tools = {tool_name: tool.schemas["description"] for tool_name, tool in available_tools.items()}
tool_schemas = await self._recommend_tool(plan.current_task.instruction, available_tools)
return tool_schemas, tool_type_usage_prompt, examples
async def _debug_with_reflection(self, context: list[Message], working_memory: list[Message]):
reflection_prompt = REFLECTION_PROMPT.format(
debug_example=DEBUG_REFLECTION_EXAMPLE,
context=context,
previous_impl=working_memory,
)
# print(reflection_prompt)
system_prompt = "You are an AI Python assistant. You will be given your previous implementation code of a task, runtime error results, and a hint to change the implementation appropriately. Write your full implementation "
rsp = await self._aask(reflection_prompt, system_msgs=[system_prompt])
rsp = await self._aask(reflection_prompt, system_msgs=[REFLECTION_SYSTEM_MSG])
reflection = json.loads(CodeParser.parse_code(block=None, text=rsp))
return reflection["improved_impl"]
async def run(
self,
plan: Plan,
working_memory: list[Message] = [],
user_requirement: str,
plan_status: str = "",
tool_info: str = "",
working_memory: list[Message] = None,
use_reflection: bool = False,
**kwargs,
) -> str:
# prepare tool schemas and tool-type-specific instruction
tool_schemas, tool_type_usage_prompt, examples = await self._prepare_tools(plan=plan)
# necessary components to be used in prompt
finished_tasks = plan.get_finished_tasks()
code_written = [remove_comments(task.code) for task in finished_tasks]
code_written = "\n\n".join(code_written)
task_results = [task.result for task in finished_tasks]
task_results = "\n\n".join(task_results)
# structure prompt
structual_prompt = STRUCTUAL_PROMPT.format(
user_requirement=plan.goal,
code_written=code_written,
task_results=task_results,
current_task=plan.current_task.instruction,
tool_type_usage_prompt=tool_type_usage_prompt,
tool_schemas=tool_schemas,
examples=examples,
user_requirement=user_requirement,
plan_status=plan_status,
tool_info=tool_info,
)
working_memory = working_memory or []
context = [Message(content=structual_prompt, role="user")] + working_memory
context = process_message(context)
# temp = context + working_memory
# print(*temp, sep="***\n\n***")
# LLM call
if not use_reflection:
rsp = await self.llm.aask(context, **kwargs)
rsp = await self.llm.aask(context, system_msgs=[INTERPRETER_SYSTEM_MSG], **kwargs)
code = CodeParser.parse_code(block=None, text=rsp)
else:

View file

@ -13,7 +13,7 @@ from typing import Tuple
from metagpt.actions import Action
from metagpt.logs import logger
from metagpt.schema import Message, Plan, Task
from metagpt.tools import TOOL_REGISTRY
from metagpt.strategy.task_type import TaskType
from metagpt.utils.common import CodeParser
@ -43,7 +43,7 @@ class WritePlan(Action):
async def run(self, context: list[Message], max_tasks: int = 5, use_tools: bool = False) -> str:
task_type_desc = "\n".join(
[f"- **{tool_type.name}**: {tool_type.desc}" for tool_type in TOOL_REGISTRY.get_tool_types().values()]
[f"- **{tt.type_name}**: {tt.value.desc}" for tt in TaskType]
) # task type are binded with tool type now, should be improved in the future
prompt = self.PROMPT_TEMPLATE.format(
context="\n".join([str(ct) for ct in context]), max_tasks=max_tasks, task_type_desc=task_type_desc

View file

@ -1,56 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/11/24 15:43
# @Author : lidanyang
# @File : ml_action
# @Desc :
MODEL_TRAIN_EXAMPLE = """
when current task is "train a lightgbm model on training data", the code can be like:
```python
# Step 1: check data type and convert to numeric
obj_cols = train.select_dtypes(include='object').columns.tolist()
for col in obj_cols:
encoder = LabelEncoder()
train[col] = encoder.fit_transform(train[col].unique().tolist() + ['unknown'])
test[col] = test[col].apply(lambda x: x if x in encoder.classes_ else 'unknown')
test[col] = encoder.transform(test[col])
# Step 2: train lightgbm model
model = LGBMClassifier()
model.fit(train, y_train)
```end
# Constraints:
- Ensure the output new code is executable in the same Jupyter notebook with previous tasks code have been executed.
"""
USE_ML_TOOLS_EXAMPLE = """
when current task is "do data preprocess, like fill missing value, handle outliers, etc.", the code can be like:
```python
# Step 1: fill missing value
# Tools used: ['FillMissingValue']
from metagpt.tools.libs.data_preprocess import FillMissingValue
train_processed = train.copy()
test_processed = test.copy()
num_cols = train_processed.select_dtypes(include='number').columns.tolist()
if 'label' in num_cols:
num_cols.remove('label')
fill_missing_value = FillMissingValue(features=num_cols, strategy='mean')
fill_missing_value.fit(train_processed)
train_processed = fill_missing_value.transform(train_processed)
test_processed = fill_missing_value.transform(test_processed)
# Step 2: handle outliers
for col in num_cols:
low, high = train_processed[col].quantile([0.01, 0.99])
train_processed[col] = train_processed[col].clip(low, high)
test_processed[col] = test_processed[col].clip(low, high)
```end
# Constraints:
- Ensure the output new code is executable in the same Jupyter notebook with previous tasks code have been executed.
- Always prioritize using pre-defined tools for the same functionality.
- Always copy the DataFrame before processing it and use the copy to process.
"""

View file

@ -1,33 +1,19 @@
INTERPRETER_SYSTEM_MSG = """As a data scientist, you need to help user to achieve their goal step by step in a continuous Jupyter notebook. Since it is a notebook environment, don't use asyncio.run. Instead, use await if you need to call an async function."""
STRUCTUAL_PROMPT = """
# Background
As a data scientist, you need to help user to achieve their goal [{user_requirement}] step-by-step in an continuous Jupyter notebook. Since it is a notebook environment, don't use asyncio.run. Instead, use await if you need to call an async function.
# User Requirement
{user_requirement}
# Finished Tasks
## code
```python
{code_written}
```
# Plan Status
{plan_status}
## execution result
{task_results}
# Tool Info
{tool_info}
# Current Task
{current_task}
# Instruction
Write complete code for 'Current Task'. And avoid duplicating code from 'Finished Tasks', such as repeated import of packages, reading data, etc.
Specifically, {tool_type_usage_prompt}
# Capabilities
- You can utilize pre-defined tools in any code lines from 'Available Tools' in the form of Python class or function.
- You can freely combine the use of any other public packages, like sklearn, numpy, pandas, etc..
# Available Tools:
Each tool is described in JSON format. When you call a tool, import the tool from its path first.
{tool_schemas}
# Examples
{examples}
# Constraints
- Take on Current Task if it is in Plan Status, otherwise, tackle User Requirement directly.
- Ensure the output new code is executable in the same Jupyter notebook as the previous executed code.
- Always prioritize using pre-defined tools for the same functionality.
# Output
Output code in the following format:
@ -36,6 +22,8 @@ your code
```
"""
REFLECTION_SYSTEM_MSG = """You are an AI Python assistant. You will be given your previous implementation code of a task, runtime error results, and a hint to change the implementation appropriately. Write your full implementation."""
DEBUG_REFLECTION_EXAMPLE = '''
[previous impl]:
assistant:
@ -122,24 +110,3 @@ DATA_INFO = """
Latest data info after previous tasks:
{info}
"""
TOOL_RECOMMENDATION_PROMPT = """
## User Requirement:
{current_task}
## Task
Recommend up to five tools from 'Available Tools' that can help solve the 'User Requirement'.
## Available Tools:
{available_tools}
## Tool Selection and Instructions:
- Select tools most relevant to completing the 'User Requirement'.
- If you believe that no tools are suitable, indicate with an empty list.
- Only list the names of the tools, not the full schema of each tool.
- Ensure selected tools are listed in 'Available Tools'.
- Output a json list of tool names:
```json
["tool_name1", "tool_name2", ...]
```
"""

View file

@ -1,11 +1,11 @@
# Prompt for using tools of "eda" type
# Prompt for taking on "eda" tasks
EDA_PROMPT = """
The current task is about exploratory data analysis, please note the following:
- Distinguish column types with `select_dtypes` for tailored analysis and visualization, such as correlation.
- Remember to `import numpy as np` before using Numpy functions.
"""
# Prompt for using tools of "data_preprocess" type
# Prompt for taking on "data_preprocess" tasks
DATA_PREPROCESS_PROMPT = """
The current task is about data preprocessing, please note the following:
- Monitor data types per column, applying appropriate methods.
@ -15,9 +15,10 @@ The current task is about data preprocessing, please note the following:
- Prefer alternatives to one-hot encoding for categorical data.
- Only encode or scale necessary columns to allow for potential feature-specific engineering tasks (like time_extract, binning, extraction, etc.) later.
- Each step do data preprocessing to train, must do same for test separately at the same time.
- Always copy the DataFrame before processing it and use the copy to process.
"""
# Prompt for using tools of "feature_engineering" type
# Prompt for taking on "feature_engineering" tasks
FEATURE_ENGINEERING_PROMPT = """
The current task is about feature engineering. when performing it, please adhere to the following principles:
- Generate as diverse features as possible to improve the model's performance step-by-step.
@ -27,9 +28,10 @@ The current task is about feature engineering. when performing it, please adhere
- Each feature engineering operation performed on the train set must also applies to the test separately at the same time.
- Avoid using the label column to create features, except for cat encoding.
- Use the data from previous task result if exist, do not mock or reload data yourself.
- Always copy the DataFrame before processing it and use the copy to process.
"""
# Prompt for using tools of "model_train" type
# Prompt for taking on "model_train" tasks
MODEL_TRAIN_PROMPT = """
The current task is about training a model, please ensure high performance:
- Keep in mind that your user prioritizes results and is highly focused on model performance. So, when needed, feel free to use models of any complexity to improve effectiveness, such as XGBoost, CatBoost, etc.
@ -38,14 +40,14 @@ The current task is about training a model, please ensure high performance:
- Set suitable hyperparameters for the model, make metrics as high as possible.
"""
# Prompt for using tools of "model_evaluate" type
# Prompt for taking on "model_evaluate" tasks
MODEL_EVALUATE_PROMPT = """
The current task is about evaluating a model, please note the following:
- Ensure that the evaluated data is same processed as the training data. If not, remember use object in 'Done Tasks' to transform the data.
- Use trained model from previous task result directly, do not mock or reload model yourself.
"""
# Prompt for using tools of "vision" type
# Prompt for taking on "image2webpage" tasks
IMAGE2WEBPAGE_PROMPT = """
The current task is about converting image into webpage code. please note the following:
- Single-Step Code Generation: Execute the entire code generation process in a single step, encompassing HTML, CSS, and JavaScript. Avoid fragmenting the code generation into multiple separate steps to maintain consistency and simplify the development workflow.

View file

@ -1,6 +1,9 @@
from __future__ import annotations
from pydantic import Field
import json
from typing import Literal, Union
from pydantic import Field, model_validator
from metagpt.actions.mi.ask_review import ReviewConst
from metagpt.actions.mi.execute_nb_code import ExecuteNbCode
@ -9,40 +12,80 @@ from metagpt.logs import logger
from metagpt.prompts.mi.write_analysis_code import DATA_INFO
from metagpt.roles import Role
from metagpt.schema import Message, Task, TaskResult
from metagpt.tools.tool_type import ToolType
from metagpt.strategy.task_type import TaskType
from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender
from metagpt.utils.common import CodeParser
REACT_THINK_PROMPT = """
# User Requirement
{user_requirement}
# Context
{context}
Output a json following the format:
```json
{{
"thoughts": str = "Thoughts on current situation, reflect on how you should proceed to fulfill the user requirement",
"state": bool = "Decide whether you need to take more actions to complete the user requirement. Return true if you think so. Return false if you think the requirement has been completely fulfilled."
}}
```
"""
class Interpreter(Role):
name: str = "Ivy"
profile: str = "Interpreter"
auto_run: bool = True
use_tools: bool = False
use_plan: bool = True
use_reflection: bool = False
execute_code: ExecuteNbCode = Field(default_factory=ExecuteNbCode, exclude=True)
tools: list[str] = []
tools: Union[str, list[str]] = []
tool_recommender: ToolRecommender = None
react_mode: Literal["plan_and_act", "react"] = "plan_and_act"
max_react_loop: int = 10 # used for react mode
def __init__(
self,
auto_run=True,
use_tools=False,
tools=[],
**kwargs,
):
super().__init__(auto_run=auto_run, use_tools=use_tools, tools=tools, **kwargs)
self._set_react_mode(react_mode="plan_and_act", auto_run=auto_run, use_tools=use_tools)
if use_tools and tools:
from metagpt.tools.tool_registry import (
validate_tool_names, # import upon use
)
self.tools = validate_tool_names(tools)
logger.info(f"will only use {self.tools} as tools")
@model_validator(mode="after")
def set_plan_and_tool(self) -> "Interpreter":
self._set_react_mode(react_mode=self.react_mode, max_react_loop=self.max_react_loop, auto_run=self.auto_run)
self.use_plan = (
self.react_mode == "plan_and_act"
) # create a flag for convenience, overwrite any passed-in value
if self.tools:
self.tool_recommender = BM25ToolRecommender(tools=self.tools)
self.set_actions([WriteCodeWithTools])
return self
@property
def working_memory(self):
return self.rc.working_memory
async def _think(self) -> bool:
"""Useful in 'react' mode. Use LLM to decide whether and what to do next."""
user_requirement = self.get_memories()[0].content
context = self.working_memory.get()
if not context:
# just started the run, we need action certainly
self.working_memory.add(self.get_memories()[0]) # add user requirement to working memory
self._set_state(0)
return True
prompt = REACT_THINK_PROMPT.format(user_requirement=user_requirement, context=context)
rsp = await self.llm.aask(prompt)
rsp_dict = json.loads(CodeParser.parse_code(block=None, text=rsp))
self.working_memory.add(Message(content=rsp_dict["thoughts"], role="assistant"))
need_action = rsp_dict["state"]
self._set_state(0) if need_action else self._set_state(-1)
return need_action
async def _act(self) -> Message:
"""Useful in 'react' mode. Return a Message conforming to Role._act interface."""
code, _, _ = await self._write_and_exec_code()
return Message(content=code, role="assistant", cause_by=WriteCodeWithTools)
async def _act_on_task(self, current_task: Task) -> TaskResult:
"""Useful in 'plan_and_act' mode. Wrap the output in a TaskResult for review and confirmation."""
code, result, is_success = await self._write_and_exec_code()
task_result = TaskResult(code=code, result=result, is_success=is_success)
return task_result
@ -51,11 +94,25 @@ class Interpreter(Role):
counter = 0
success = False
# plan info
plan_status = self.planner.get_plan_status() if self.use_plan else ""
# tool info
if self.tools:
context = (
self.working_memory.get()[-1].content if self.working_memory.get() else ""
) # thoughts from _think stage in 'react' mode
plan = self.planner.plan if self.use_plan else None
tool_info = await self.tool_recommender.get_recommended_tool_info(context=context, plan=plan)
else:
tool_info = ""
# data info
await self._check_data()
while not success and counter < max_retry:
### write code ###
code, cause_by = await self._write_code(counter)
code, cause_by = await self._write_code(counter, plan_status, tool_info)
self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by))
@ -76,22 +133,33 @@ class Interpreter(Role):
return code, result, success
async def _write_code(self, counter):
todo = WriteCodeWithTools(use_tools=self.use_tools, selected_tools=self.tools)
async def _write_code(
self,
counter,
plan_status="",
tool_info="",
):
todo = WriteCodeWithTools()
logger.info(f"ready to {todo.name}")
use_reflection = counter > 0 and self.use_reflection
user_requirement = self.get_memories()[0].content
code = await todo.run(
plan=self.planner.plan, working_memory=self.working_memory.get(), use_reflection=use_reflection
user_requirement=user_requirement,
plan_status=plan_status,
tool_info=tool_info,
working_memory=self.working_memory.get(),
use_reflection=use_reflection,
)
return code, todo
async def _check_data(self):
current_task = self.planner.plan.current_task
if current_task.task_type not in [
ToolType.DATA_PREPROCESS.type_name,
ToolType.FEATURE_ENGINEERING.type_name,
ToolType.MODEL_TRAIN.type_name,
if not self.use_plan or self.planner.plan.current_task.task_type not in [
TaskType.DATA_PREPROCESS.type_name,
TaskType.FEATURE_ENGINEERING.type_name,
TaskType.MODEL_TRAIN.type_name,
]:
return
logger.info("Check updated data")

View file

@ -283,7 +283,7 @@ class Role(SerializationMixin, ContextMixin, BaseModel):
self.actions.append(i)
self.states.append(f"{len(self.actions)}. {action}")
def _set_react_mode(self, react_mode: str, max_react_loop: int = 1, auto_run: bool = True, use_tools: bool = False):
def _set_react_mode(self, react_mode: str, max_react_loop: int = 1, auto_run: bool = True):
"""Set strategy of the Role reacting to observed Message. Variation lies in how
this Role elects action to perform during the _think stage, especially if it is capable of multiple Actions.
@ -304,9 +304,7 @@ class Role(SerializationMixin, ContextMixin, BaseModel):
if react_mode == RoleReactMode.REACT:
self.rc.max_react_loop = max_react_loop
elif react_mode == RoleReactMode.PLAN_AND_ACT:
self.planner = Planner(
goal=self.goal, working_memory=self.rc.working_memory, auto_run=auto_run, use_tools=use_tools
)
self.planner = Planner(goal=self.goal, working_memory=self.rc.working_memory, auto_run=auto_run)
def _watch(self, actions: Iterable[Type[Action]] | Iterable[Action]):
"""Watch Actions of interest. Role will select Messages caused by these Actions from its personal message

View file

@ -13,6 +13,8 @@ from metagpt.actions.mi.write_plan import (
from metagpt.logs import logger
from metagpt.memory import Memory
from metagpt.schema import Message, Plan, Task, TaskResult
from metagpt.strategy.task_type import TaskType
from metagpt.utils.common import remove_comments
STRUCTURAL_CONTEXT = """
## User Requirement
@ -25,6 +27,24 @@ STRUCTURAL_CONTEXT = """
{current_task}
"""
PLAN_STATUS = """
## Finished Tasks
### code
```python
{code_written}
```
### execution result
{task_results}
## Current Task
{current_task}
## Task Guidance
Write complete code for 'Current Task'. And avoid duplicating code from 'Finished Tasks', such as repeated import of packages, reading data, etc.
Specifically, {guidance}
"""
class Planner(BaseModel):
plan: Plan
@ -136,3 +156,23 @@ class Planner(BaseModel):
context_msg = [Message(content=context, role="user")]
return context_msg + self.working_memory.get()
def get_plan_status(self) -> str:
# prepare components of a plan status
finished_tasks = self.plan.get_finished_tasks()
code_written = [remove_comments(task.code) for task in finished_tasks]
code_written = "\n\n".join(code_written)
task_results = [task.result for task in finished_tasks]
task_results = "\n\n".join(task_results)
task_type_name = self.current_task.task_type.upper()
guidance = TaskType[task_type_name].value.guidance if hasattr(TaskType, task_type_name) else ""
# combine components in a prompt
prompt = PLAN_STATUS.format(
code_written=code_written,
task_results=task_results,
current_task=self.current_task.instruction,
guidance=guidance,
)
return prompt

View file

@ -0,0 +1,57 @@
from enum import Enum
from pydantic import BaseModel
from metagpt.prompts.task_type import (
DATA_PREPROCESS_PROMPT,
EDA_PROMPT,
FEATURE_ENGINEERING_PROMPT,
IMAGE2WEBPAGE_PROMPT,
MODEL_EVALUATE_PROMPT,
MODEL_TRAIN_PROMPT,
)
class TaskTypeDef(BaseModel):
name: str
desc: str = ""
guidance: str = ""
class TaskType(Enum):
EDA = TaskTypeDef(
name="eda",
desc="For performing exploratory data analysis",
guidance=EDA_PROMPT,
)
DATA_PREPROCESS = TaskTypeDef(
name="data_preprocess",
desc="For preprocessing dataset in a data analysis or machine learning task ONLY,"
"general data operation doesn't fall into this type",
guidance=DATA_PREPROCESS_PROMPT,
)
FEATURE_ENGINEERING = TaskTypeDef(
name="feature_engineering",
desc="Only for creating new columns for input data.",
guidance=FEATURE_ENGINEERING_PROMPT,
)
MODEL_TRAIN = TaskTypeDef(
name="model_train",
desc="Only for training model.",
guidance=MODEL_TRAIN_PROMPT,
)
MODEL_EVALUATE = TaskTypeDef(
name="model_evaluate",
desc="Only for evaluating model.",
guidance=MODEL_EVALUATE_PROMPT,
)
IMAGE2WEBPAGE = TaskTypeDef(
name="image2webpage",
desc="For converting image into webpage code.",
guidance=IMAGE2WEBPAGE_PROMPT,
)
OTHER = TaskTypeDef(name="other", desc="Any tasks not in the defined categories")
@property
def type_name(self):
return self.value.name

View file

@ -16,9 +16,8 @@ from sklearn.preprocessing import (
)
from metagpt.tools.tool_registry import register_tool
from metagpt.tools.tool_type import ToolType
TOOL_TYPE = ToolType.DATA_PREPROCESS.type_name
TAGS = ["data preprocessing", "machine learning"]
class MLProcess:
@ -85,7 +84,7 @@ class DataPreprocessTool(MLProcess):
return new_df
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class FillMissingValue(DataPreprocessTool):
"""
Completing missing values with simple strategies.
@ -106,7 +105,7 @@ class FillMissingValue(DataPreprocessTool):
self.model = SimpleImputer(strategy=strategy, fill_value=fill_value)
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class MinMaxScale(DataPreprocessTool):
"""
Transform features by scaling each feature to a range, which is (0, 1).
@ -117,7 +116,7 @@ class MinMaxScale(DataPreprocessTool):
self.model = MinMaxScaler()
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class StandardScale(DataPreprocessTool):
"""
Standardize features by removing the mean and scaling to unit variance.
@ -128,7 +127,7 @@ class StandardScale(DataPreprocessTool):
self.model = StandardScaler()
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class MaxAbsScale(DataPreprocessTool):
"""
Scale each feature by its maximum absolute value.
@ -139,7 +138,7 @@ class MaxAbsScale(DataPreprocessTool):
self.model = MaxAbsScaler()
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class RobustScale(DataPreprocessTool):
"""
Apply the RobustScaler to scale features using statistics that are robust to outliers.
@ -150,7 +149,7 @@ class RobustScale(DataPreprocessTool):
self.model = RobustScaler()
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class OrdinalEncode(DataPreprocessTool):
"""
Encode categorical features as ordinal integers.
@ -161,7 +160,7 @@ class OrdinalEncode(DataPreprocessTool):
self.model = OrdinalEncoder()
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class OneHotEncode(DataPreprocessTool):
"""
Apply one-hot encoding to specified categorical columns, the original columns will be dropped.
@ -180,7 +179,7 @@ class OneHotEncode(DataPreprocessTool):
return new_df
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class LabelEncode(DataPreprocessTool):
"""
Apply label encoding to specified categorical columns in-place.

View file

@ -1,7 +1,6 @@
from imap_tools import MailBox
from metagpt.tools.tool_registry import register_tool
from metagpt.tools.tool_type import ToolType
# Define a dictionary mapping email domains to their IMAP server addresses
IMAP_SERVERS = {
@ -24,7 +23,7 @@ IMAP_SERVERS = {
}
@register_tool(tool_type=ToolType.EMAIL_LOGIN.type_name)
@register_tool()
def email_login_imap(email_address, email_password):
"""
Use imap_tools package to log in to your email (the email that supports IMAP protocol) to verify and return the account object.

View file

@ -19,12 +19,11 @@ from sklearn.preprocessing import KBinsDiscretizer, PolynomialFeatures
from metagpt.tools.libs.data_preprocess import MLProcess
from metagpt.tools.tool_registry import register_tool
from metagpt.tools.tool_type import ToolType
TOOL_TYPE = ToolType.FEATURE_ENGINEERING.type_name
TAGS = ["feature engineering", "machine learning"]
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class PolynomialExpansion(MLProcess):
"""
Add polynomial and interaction features from selected numeric columns to input DataFrame.
@ -67,7 +66,7 @@ class PolynomialExpansion(MLProcess):
return new_df
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class CatCount(MLProcess):
"""
Add value counts of a categorical column as new feature.
@ -92,7 +91,7 @@ class CatCount(MLProcess):
return new_df
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class TargetMeanEncoder(MLProcess):
"""
Encode a categorical column by the mean of the label column, and adds the result as a new feature.
@ -119,7 +118,7 @@ class TargetMeanEncoder(MLProcess):
return new_df
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class KFoldTargetMeanEncoder(MLProcess):
"""
Add a new feature to the DataFrame by k-fold mean encoding of a categorical column using the label column.
@ -159,7 +158,7 @@ class KFoldTargetMeanEncoder(MLProcess):
return new_df
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class CatCross(MLProcess):
"""
Add pairwise crossed features and convert them to numerical features.
@ -216,7 +215,7 @@ class CatCross(MLProcess):
return new_df
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class GroupStat(MLProcess):
"""
Aggregate specified column in a DataFrame grouped by another column, adding new features named '<agg_col>_<agg_func>_by_<group_col>'.
@ -248,7 +247,7 @@ class GroupStat(MLProcess):
return new_df
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class SplitBins(MLProcess):
"""
Inplace binning of continuous data into intervals, returning integer-encoded bin identifiers directly.
@ -276,7 +275,7 @@ class SplitBins(MLProcess):
return new_df
# @register_tool(tool_type=TOOL_TYPE)
# @register_tool(tags=TAGS)
class ExtractTimeComps(MLProcess):
"""
Extract time components from a datetime column and add them as new features.
@ -316,7 +315,7 @@ class ExtractTimeComps(MLProcess):
return new_df
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class GeneralSelection(MLProcess):
"""
Drop all nan feats and feats with only one unique value.
@ -349,7 +348,7 @@ class GeneralSelection(MLProcess):
# skip for now because lgb is needed
# @register_tool(tool_type=TOOL_TYPE)
# @register_tool(tags=TAGS)
class TreeBasedSelection(MLProcess):
"""
Select features based on tree-based model and remove features with low importance.
@ -403,7 +402,7 @@ class TreeBasedSelection(MLProcess):
return new_df
@register_tool(tool_type=TOOL_TYPE)
@register_tool(tags=TAGS)
class VarianceBasedSelection(MLProcess):
"""
Select features based on variance and remove features with low variance.

View file

@ -10,7 +10,6 @@ from pathlib import Path
from metagpt.const import DEFAULT_WORKSPACE_ROOT
from metagpt.tools.tool_registry import register_tool
from metagpt.tools.tool_type import ToolType
from metagpt.utils.common import encode_image
ANALYZE_LAYOUT_PROMPT = """You are now a UI/UX designer, please generate layout information for this image:
@ -28,9 +27,7 @@ As the design pays tribute to large companies, sometimes it is normal for some c
Now, please generate the corresponding webpage code including HTML, CSS and JavaScript:"""
@register_tool(
tool_type=ToolType.IMAGE2WEBPAGE.type_name, include_functions=["__init__", "generate_webpages", "save_webpages"]
)
@register_tool(include_functions=["__init__", "generate_webpages", "save_webpages"])
class GPTvGenerator:
"""Class for generating webpages at once.

View file

@ -18,7 +18,6 @@ from PIL import Image, PngImagePlugin
from metagpt.const import SD_OUTPUT_FILE_REPO, SOURCE_ROOT
from metagpt.logs import logger
from metagpt.tools.tool_registry import register_tool
from metagpt.tools.tool_type import ToolType
payload = {
"prompt": "",
@ -55,7 +54,7 @@ default_negative_prompt = "(easynegative:0.8),black, dark,Low resolution"
@register_tool(
tool_type=ToolType.STABLE_DIFFUSION.type_name,
tags=["text2image", "multimodal"],
include_functions=["__init__", "simple_run_t2i", "run_t2i", "construct_payload", "save"],
)
class SDEngine:

View file

@ -1,9 +1,8 @@
from metagpt.tools.tool_registry import register_tool
from metagpt.tools.tool_type import ToolType
from metagpt.tools.web_browser_engine_playwright import PlaywrightWrapper
@register_tool(tool_type=ToolType.WEBSCRAPING.type_name)
@register_tool(tags=["web scraping", "web"])
async def scrape_web_playwright(url):
"""
Asynchronously Scrape and save the HTML structure and inner text content of a web page using Playwright.

View file

@ -3,7 +3,7 @@ import inspect
from metagpt.utils.parse_docstring import GoogleDocstringParser, remove_spaces
def convert_code_to_tool_schema(obj, include: list[str] = []):
def convert_code_to_tool_schema(obj, include: list[str] = None):
docstring = inspect.getdoc(obj)
assert docstring, "no docstring found for the objects, skip registering"

View file

@ -1,12 +1,6 @@
from pydantic import BaseModel
class ToolTypeDef(BaseModel):
name: str
desc: str = ""
usage_prompt: str = ""
class ToolSchema(BaseModel):
description: str
@ -16,3 +10,4 @@ class Tool(BaseModel):
path: str
schemas: dict = {}
code: str = ""
tags: list[str] = []

View file

@ -0,0 +1,196 @@
from __future__ import annotations
import json
from typing import Any
import jieba
import numpy as np
from pydantic import BaseModel, field_validator
from rank_bm25 import BM25Okapi
from metagpt.actions import Action
from metagpt.logs import logger
from metagpt.schema import Plan
from metagpt.tools import TOOL_REGISTRY
from metagpt.tools.tool_data_type import Tool
from metagpt.tools.tool_registry import validate_tool_names
from metagpt.utils.common import CodeParser
TOOL_INFO_PROMPT = """
## Capabilities
- You can utilize pre-defined tools in any code lines from 'Available Tools' in the form of Python class or function.
- You can freely combine the use of any other public packages, like sklearn, numpy, pandas, etc..
## Available Tools:
Each tool is described in JSON format. When you call a tool, import the tool from its path first.
{tool_schemas}
"""
TOOL_RECOMMENDATION_PROMPT = """
## User Requirement:
{current_task}
## Task
Recommend up to {topk} tools from 'Available Tools' that can help solve the 'User Requirement'.
## Available Tools:
{available_tools}
## Tool Selection and Instructions:
- Select tools most relevant to completing the 'User Requirement'.
- If you believe that no tools are suitable, indicate with an empty list.
- Only list the names of the tools, not the full schema of each tool.
- Ensure selected tools are listed in 'Available Tools'.
- Output a json list of tool names:
```json
["tool_name1", "tool_name2", ...]
```
"""
class RecommendTool(Action):
async def run(self, prompt):
return await self._aask(prompt)
class ToolRecommender(BaseModel):
"""
The default ToolRecommender:
1. Recall: If plan exists, use exact match between task type and tool type to recall tools;
If plan doesn't exist (e.g. we use ReAct), return all user-specified tools;
2. Rank: Use LLM to select final candidates from recalled set.
"""
tools: dict[str, Tool] = {}
force: bool = False
@field_validator("tools", mode="before")
@classmethod
def validate_tools(cls, v: list[str]) -> dict[str, Tool]:
if v == ["<all>"]:
return TOOL_REGISTRY.get_all_tools()
else:
return validate_tool_names(v)
async def recommend_tools(
self, context: str = "", plan: Plan = None, recall_topk: int = 20, topk: int = 5
) -> list[Tool]:
"""
Recommends a list of tools based on the given context and plan. The recommendation process includes two stages: recall from a large pool and rank the recalled tools to select the final set.
Args:
context (str): The context for tool recommendation.
plan (Plan): The plan for tool recommendation.
recall_topk (int): The number of tools to recall in the initial step.
topk (int): The number of tools to return after rank as final recommendations.
Returns:
list[Tool]: A list of recommended tools.
"""
if not self.tools:
return []
if self.force or (not context and not plan):
# directly use what users have specified as result for forced recommendation;
# directly use the whole set if there is no useful information
return list(self.tools.values())
recalled_tools = await self.recall_tools(context=context, plan=plan, topk=recall_topk)
if not recalled_tools:
return []
ranked_tools = await self.rank_tools(recalled_tools=recalled_tools, context=context, plan=plan, topk=topk)
logger.info(f"Recommended tools: \n{[tool.name for tool in ranked_tools]}")
return ranked_tools
async def get_recommended_tool_info(self, **kwargs) -> str:
"""
Wrap recommended tools with their info in a string, which can be used directly in a prompt.
"""
recommended_tools = await self.recommend_tools(**kwargs)
if not recommended_tools:
return ""
tool_schemas = {tool.name: tool.schemas for tool in recommended_tools}
return TOOL_INFO_PROMPT.format(tool_schemas=tool_schemas)
async def recall_tools(self, context: str = "", plan: Plan = None, topk: int = 20) -> list[Tool]:
"""
Retrieves a list of relevant tools from a large pool, based on the given context and plan.
"""
raise NotImplementedError
async def rank_tools(
self, recalled_tools: list[Tool], context: str = "", plan: Plan = None, topk: int = 5
) -> list[Tool]:
"""
Default rank methods for a ToolRecommender. Use LLM to rank the recalled tools based on the given context, plan, and topk value.
"""
current_task = plan.current_task.instruction if plan else context
available_tools = {tool.name: tool.schemas["description"] for tool in recalled_tools}
prompt = TOOL_RECOMMENDATION_PROMPT.format(
current_task=current_task,
available_tools=available_tools,
topk=topk,
)
rsp = await RecommendTool().run(prompt)
rsp = CodeParser.parse_code(block=None, text=rsp)
ranked_tools = json.loads(rsp)
valid_tools = validate_tool_names(ranked_tools)
return list(valid_tools.values())[:topk]
class BM25ToolRecommender(ToolRecommender):
"""
A ToolRecommender using BM25 at the recall stage:
1. Recall: Querying tool descriptions with task instruction if plan exists. Otherwise, return all user-specified tools;
2. Rank: LLM rank, the same as the default ToolRecommender.
"""
bm25: Any = None
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._init_corpus()
def _init_corpus(self):
corpus = [f"{tool.name} {tool.tags}: {tool.schemas['description']}" for tool in self.tools.values()]
tokenized_corpus = [self._tokenize(doc) for doc in corpus]
self.bm25 = BM25Okapi(tokenized_corpus)
def _tokenize(self, text):
return jieba.lcut(text) # FIXME: needs more sophisticated tokenization
async def recall_tools(self, context: str = "", plan: Plan = None, topk: int = 20) -> list[Tool]:
query = plan.current_task.instruction if plan else context
query_tokens = self._tokenize(query)
doc_scores = self.bm25.get_scores(query_tokens)
top_indexes = np.argsort(doc_scores)[::-1][:topk]
recalled_tools = [list(self.tools.values())[index] for index in top_indexes]
print([doc_scores[index] for index in top_indexes])
print([recalled_tools[i].name for i in range(len(recalled_tools))])
print([recalled_tools[i].schemas["description"] for i in range(len(recalled_tools))])
return recalled_tools
class EmbeddingToolRecommender(ToolRecommender):
"""
NOTE: To be implemented.
A ToolRecommender using embeddings at the recall stage:
1. Recall: Use embeddings to calculate the similarity between query and tool info;
2. Rank: LLM rank, the same as the default ToolRecommender.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def recall_tools(self, context: str = "", plan: Plan = None, topk: int = 20) -> list[Tool]:
pass

View file

@ -10,26 +10,20 @@ from __future__ import annotations
import inspect
import os
from collections import defaultdict
from typing import Union
import yaml
from pydantic import BaseModel, field_validator
from pydantic import BaseModel
from metagpt.const import TOOL_SCHEMA_PATH
from metagpt.logs import logger
from metagpt.tools.tool_convert import convert_code_to_tool_schema
from metagpt.tools.tool_data_type import Tool, ToolSchema, ToolTypeDef
from metagpt.tools.tool_type import ToolType
from metagpt.tools.tool_data_type import Tool, ToolSchema
class ToolRegistry(BaseModel):
tools: dict = {}
tool_types: dict = {}
tools_by_types: dict = defaultdict(dict) # two-layer k-v, {tool_type: {tool_name: {...}, ...}, ...}
@field_validator("tool_types", mode="before")
@classmethod
def init_tool_types(cls, tool_types: ToolType):
return {tool_type.type_name: tool_type.value for tool_type in tool_types}
tools_by_tags: dict = defaultdict(dict) # two-layer k-v, {tag: {tool_name: {...}, ...}, ...}
def register_tool(
self,
@ -37,25 +31,15 @@ class ToolRegistry(BaseModel):
tool_path,
schema_path="",
tool_code="",
tool_type="other",
tags=None,
tool_source_object=None,
include_functions=[],
include_functions=None,
verbose=False,
):
if self.has_tool(tool_name):
return
if tool_type not in self.tool_types:
# register new tool type on the fly
logger.warning(
f"{tool_type} not previously defined, will create a temporary tool type with just a name. This tool type is only effective during this runtime. You may consider add this tool type with more configs permanently at metagpt.tools.tool_type"
)
temp_tool_type_obj = ToolTypeDef(name=tool_type)
self.tool_types[tool_type] = temp_tool_type_obj
if verbose:
logger.info(f"tool type {tool_type} registered")
schema_path = schema_path or TOOL_SCHEMA_PATH / tool_type / f"{tool_name}.yml"
schema_path = schema_path or TOOL_SCHEMA_PATH / f"{tool_name}.yml"
schemas = make_schema(tool_source_object, include_functions, schema_path)
@ -70,10 +54,11 @@ class ToolRegistry(BaseModel):
# logger.warning(
# f"{tool_name} schema not conforms to required format, but will be used anyway. Mismatch: {e}"
# )
tool = Tool(name=tool_name, path=tool_path, schemas=schemas, code=tool_code)
tags = tags or []
tool = Tool(name=tool_name, path=tool_path, schemas=schemas, code=tool_code, tags=tags)
self.tools[tool_name] = tool
self.tools_by_types[tool_type][tool_name] = tool
for tag in tags:
self.tools_by_tags[tag].update({tool_name: tool})
if verbose:
logger.info(f"{tool_name} registered")
logger.info(f"schema made at {str(schema_path)}, can be used for checking")
@ -84,24 +69,24 @@ class ToolRegistry(BaseModel):
def get_tool(self, key) -> Tool:
return self.tools.get(key)
def get_tools_by_type(self, key) -> dict[str, Tool]:
return self.tools_by_types.get(key, {})
def get_tools_by_tag(self, key) -> dict[str, Tool]:
return self.tools_by_tags.get(key, {})
def has_tool_type(self, key) -> bool:
return key in self.tool_types
def get_all_tools(self) -> dict[str, Tool]:
return self.tools
def get_tool_type(self, key) -> ToolType:
return self.tool_types.get(key)
def has_tool_tag(self, key) -> bool:
return key in self.tools_by_tags
def get_tool_types(self) -> dict[str, ToolType]:
return self.tool_types
def get_tool_tags(self) -> list[str]:
return list(self.tools_by_tags.keys())
# Registry instance
TOOL_REGISTRY = ToolRegistry(tool_types=ToolType)
TOOL_REGISTRY = ToolRegistry()
def register_tool(tool_type: str = "other", schema_path: str = "", **kwargs):
def register_tool(tags: list[str] = None, schema_path: str = "", **kwargs):
"""register a tool to registry"""
def decorator(cls):
@ -117,7 +102,7 @@ def register_tool(tool_type: str = "other", schema_path: str = "", **kwargs):
tool_path=file_path,
schema_path=schema_path,
tool_code=source_code,
tool_type=tool_type,
tags=tags,
tool_source_object=cls,
**kwargs,
)
@ -142,14 +127,15 @@ def make_schema(tool_source_object, include, path):
return schema
def validate_tool_names(tools: list[str], return_tool_object=False) -> list[str]:
valid_tools = []
for tool_name in tools:
if not TOOL_REGISTRY.has_tool(tool_name):
logger.warning(
f"Specified tool {tool_name} not found and was skipped. Check if you have registered it properly"
)
def validate_tool_names(tools: Union[list[str], str]) -> str:
assert isinstance(tools, list), "tools must be a list of str"
valid_tools = {}
for key in tools:
# one can define either tool names or tool type names, take union to get the whole set
if TOOL_REGISTRY.has_tool(key):
valid_tools.update({key: TOOL_REGISTRY.get_tool(key)})
elif TOOL_REGISTRY.tool_tool_tag(key):
valid_tools.update(TOOL_REGISTRY.get_tools_by_tag(key))
else:
valid_tool = TOOL_REGISTRY.get_tool(tool_name) if return_tool_object else tool_name
valid_tools.append(valid_tool)
logger.warning(f"invalid tool name or tool type name: {key}, skipped")
return valid_tools

View file

@ -0,0 +1,66 @@
import pytest
from metagpt.schema import Plan, Task
from metagpt.tools import TOOL_REGISTRY
from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender
@pytest.fixture
def mock_plan(mocker):
task_map = {
"1": Task(
task_id="1",
instruction="conduct feature engineering, add new features on the dataset",
task_type="feature_engineering",
)
}
plan = Plan(
goal="test requirement",
tasks=list(task_map.values()),
task_map=task_map,
current_task_id="1",
)
return plan
def test_tr_init():
tr = ToolRecommender(tools=["FillMissingValue", "PolynomialExpansion", "web_scraping", "non-existing tool"])
# web_scraping is a tool type, it has one tool scrape_web_playwright
assert list(tr.tools.keys()) == [
"FillMissingValue",
"PolynomialExpansion",
"scrape_web_playwright",
]
def test_tr_init_default_tools_value():
tr = ToolRecommender()
assert tr.tools == {}
def test_tr_init_tools_all():
tr = ToolRecommender(tools="<all>")
assert list(tr.tools.keys()) == list(TOOL_REGISTRY.get_all_tools().keys())
@pytest.mark.asyncio
async def test_tr_recall_with_plan(mock_plan):
tr = ToolRecommender(
tools=[
"FillMissingValue",
"PolynomialExpansion",
"web_scraping",
]
)
result = await tr.recall_tools(plan=mock_plan)
assert len(result) == 1
assert result[0].name == "PolynomialExpansion"
@pytest.mark.asyncio
async def test_bm25_tr_recall(mock_plan):
tr = BM25ToolRecommender(tools=["FillMissingValue", "PolynomialExpansion", "web_scraping"])
result = await tr.recall_tools(plan=mock_plan)
# print(result)
assert len(result) == 3
assert result[0].name == "PolynomialExpansion"