Merge branch 'dev_tool_selection' of https://gitlab.deepwisdomai.com/agents/data_agents_opt into dev_tool_selection

This commit is contained in:
stellahsr 2023-12-13 14:49:29 +08:00
commit 7e164eecb3
8 changed files with 334 additions and 412 deletions

View file

@ -3,7 +3,7 @@ from typing import Dict, List, Union, Tuple, Optional, Any
from metagpt.actions import Action
from metagpt.logs import logger
from metagpt.schema import Message, Plan
from metagpt.utils.common import CodeParser
from metagpt.utils.common import CodeParser, create_func_config
from metagpt.actions.write_analysis_code import BaseWriteAnalysisCode
DEBUG_REFLECTION_EXAMPLE = '''Example 1:
@ -39,25 +39,39 @@ DEBUG_REFLECTION_EXAMPLE = '''Example 1:
REFLECTION_PROMPT = """
Here is an example for you.
{debug_example}
[requirement]
{goal}
[finished code]
finished code are executable, and you should based on the code to continue your current code debug
{finished_code}
try to reuse the code here to understand the coding task.
[context]
{context}
[previous impl]
{code}
[runtime Error]
{runtime_result}
Analysis the error step by step, provide me improve method. Do not repeat [previous impl]
Analysis the error step by step, provide me improve method and code. Remember to follow [context] requirement.
[reflection on previous impl]:
xxx
"""
CODE_REFLECTION = {
"name": "execute_reflection_code",
"description": "Execute reflection code.",
"parameters": {
"type": "object",
"properties": {
"reflection": {
"type": "string",
"description": "Reflection on previous impl.",
},
"improved_impl": {
"type": "string",
"description": "Refined code after reflection.",
},
},
"required": ["reflection", "improved_impl"],
},
}
def message_to_str(message: Message) -> str:
return f"{message.role}: {message.content}"
@ -75,52 +89,68 @@ class DebugCode(BaseWriteAnalysisCode):
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
async def run_reflection(self, goal, finished_code, finished_code_result, code, runtime_result) -> str:
async def run_reflection(
self,
# goal,
# finished_code,
# finished_code_result,
context: List[Message],
code,
runtime_result,
) -> dict:
info = []
finished_code_and_result = finished_code + "\n [finished results]\n\n" + finished_code_result
# finished_code_and_result = finished_code + "\n [finished results]\n\n" + finished_code_result
reflection_prompt = REFLECTION_PROMPT.format(debug_example=DEBUG_REFLECTION_EXAMPLE,
goal=goal,
finished_code=finished_code_and_result,
context=context,
# goal=goal,
# finished_code=finished_code_and_result,
code=code,
runtime_result=runtime_result
)
system_prompt = "You are an AI Python assistant. You will be given your previous implementation of a function, runtime error results, and a hint to change the implementation appropriately. Write your full implementation "
system_prompt = "You are an AI Python assistant. You will be given your previous implementation code of a task, runtime error results, and a hint to change the implementation appropriately. Write your full implementation "
info.append(Message(role="system", content=system_prompt))
info.append(Message(role="assistant", content=reflection_prompt))
info.append(Message(role="user", content=reflection_prompt))
msg = messages_to_str(info)
resp = await self.llm.aask(msg=msg)
# msg = messages_to_str(info)
# resp = await self.llm.aask(msg=msg)
resp = await self.llm.aask_code(messages=info, **create_func_config(CODE_REFLECTION))
logger.info(f"reflection is {resp}")
return resp
async def rewrite_code(self, reflection: str = "", code_context: str = "") -> str:
"""
根据reflection重写代码
"""
info = []
info.append(Message(role="assistant", content=f"[code context]:{code_context}"
f"finished code are executable, and you should based on the code to continue your current code debug and improvement"
f"[reflection]: \n {reflection}"))
info.append(Message(role="user", content=f"[improved impl]:\n Return in Python block"))
msg = messages_to_str(info)
resp = await self.llm.aask(msg=msg)
logger.info(f"improve code is {resp}")
improv_code = CodeParser.parse_code(block=None, text=resp)
return improv_code
# async def rewrite_code(self, reflection: str = "", context: List[Message] = None) -> str:
# """
# 根据reflection重写代码
# """
# info = context
# # info.append(Message(role="assistant", content=f"[code context]:{code_context}"
# # f"finished code are executable, and you should based on the code to continue your current code debug and improvement"
# # f"[reflection]: \n {reflection}"))
# info.append(Message(role="assistant", content=f"[reflection]: \n {reflection}"))
# info.append(Message(role="user", content=f"[improved impl]:\n Return in Python block"))
# msg = messages_to_str(info)
# resp = await self.llm.aask(msg=msg)
# improv_code = CodeParser.parse_code(block=None, text=resp)
# return improv_code
async def run(self,
context: List[Message] = None,
plan: str = "",
finished_code: str = "",
finished_code_result: str = "",
# finished_code: str = "",
# finished_code_result: str = "",
code: str = "",
runtime_result: str = "") -> str:
"""
根据当前运行代码和报错信息进行reflection和纠错
"""
reflection = await self.run_reflection(plan, finished_code=finished_code,
finished_code_result=finished_code_result,
code=code,
runtime_result=runtime_result)
reflection = await self.run_reflection(
# plan,
# finished_code=finished_code,
# finished_code_result=finished_code_result,
code=code,
context=context,
runtime_result=runtime_result,
)
# 根据reflection结果重写代码
improv_code = await self.rewrite_code(reflection, code_context=finished_code)
# improv_code = await self.rewrite_code(reflection, context=context)
improv_code = reflection['improved_impl']
return improv_code

View file

@ -4,7 +4,9 @@
@Author : orange-crow
@File : write_code_v2.py
"""
from typing import Dict, List, Union, Tuple, Optional, Any
from typing import Dict, List, Union, Tuple
import yaml
from metagpt.actions import Action
from metagpt.logs import logger
@ -15,11 +17,9 @@ from metagpt.prompts.ml_engineer import (
TOOL_USAGE_PROMPT,
ML_SPECIFIC_PROMPT,
ML_MODULE_MAP,
TOOL_OUTPUT_DESC, DATA_PROCESS_PROMPT,
GENERATE_CODE_PROMPT
GENERATE_CODE_PROMPT,
)
from metagpt.schema import Message, Plan
from metagpt.tools.functions import registry
from metagpt.utils.common import create_func_config, remove_comments
@ -100,40 +100,55 @@ class WriteCodeByGenerate(BaseWriteAnalysisCode):
class WriteCodeWithTools(BaseWriteAnalysisCode):
"""Write code with help of local available tools. Choose tools first, then generate code to use the tools"""
@staticmethod
def _parse_recommend_tools(module: str, recommend_tools: list) -> List[Dict]:
def __init__(self, name: str = "", context=None, llm=None, schema_path=None):
super().__init__(name, context, llm)
self.schema_path = schema_path
self.available_tools = {}
if self.schema_path is not None:
self._load_tools(schema_path)
def _load_tools(self, schema_path):
"""Load tools from yaml file"""
yml_files = schema_path.glob("*.yml")
for yml_file in yml_files:
module = yml_file.stem
with open(yml_file, "r", encoding="utf-8") as f:
self.available_tools[module] = yaml.safe_load(f)
def _parse_recommend_tools(self, module: str, recommend_tools: list) -> dict:
"""
Parses and validates a list of recommended tools, and retrieves their schema from registry.
Args:
module (str): The module name for querying tools in the registry.
recommend_tools (list): A list of lists of recommended tools for each step.
recommend_tools (list): A list of recommended tools.
Returns:
List[Dict]: A list of dicts of valid tool schemas.
dict: A dict of valid tool schemas.
"""
valid_tools = []
available_tools = registry.get_all_by_module(module).keys()
available_tools = self.available_tools[module].keys()
for tool in recommend_tools:
if tool in available_tools:
valid_tools.append(tool)
tool_catalog = registry.get_schemas(module, valid_tools)
tool_catalog = {tool: self.available_tools[module][tool] for tool in valid_tools}
return tool_catalog
async def _tool_recommendation(
self,
task: str,
code_steps: str,
available_tools: list
self,
task: str,
code_steps: str,
available_tools: dict,
) -> list:
"""
Recommend tools for the specified task.
Args:
context (List[Message]): Action output history, source action denoted by Message.cause_by
task (str): the task to recommend tools for
code_steps (str): the code steps to generate the full code for the task
available_tools (list): the available tools for the task
available_tools (dict): the available tools description
Returns:
list: recommended tools for the specified task
@ -149,27 +164,23 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
return recommend_tools
async def run(
self,
context: List[Message],
plan: Plan = None,
code_steps: str = "",
column_info: str = "",
**kwargs,
) -> str:
self,
context: List[Message],
plan: Plan = None,
code_steps: str = "",
column_info: str = "",
**kwargs,
) -> Tuple[List[Message], str]:
task_type = plan.current_task.task_type
available_tools = registry.get_all_schema_by_module(task_type)
available_tools = self.available_tools.get(task_type, {})
special_prompt = ML_SPECIFIC_PROMPT.get(task_type, "")
column_names = kwargs.get("column_names", {})
finished_tasks = plan.get_finished_tasks()
code_context = [remove_comments(task.code) for task in finished_tasks]
code_context = "\n\n".join(code_context)
if len(available_tools) > 0:
available_tools = [
{k: tool[k] for k in ["name", "description"] if k in tool}
for tool in available_tools
]
available_tools = {k: v["description"] for k, v in available_tools.items()}
recommend_tools = await self._tool_recommendation(
plan.current_task.instruction,
@ -180,46 +191,27 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
logger.info(f"Recommended tools: \n{recommend_tools}")
module_name = ML_MODULE_MAP[task_type]
output_desc = TOOL_OUTPUT_DESC.get(task_type, "")
new_code = ""
for idx, tool in enumerate(recommend_tools):
hist_info = f"Previous finished code is \n\n ```Python {code_context} ``` \n\n "
prompt = TOOL_USAGE_PROMPT.format(
goal=plan.current_task.instruction,
context=hist_info,
code_steps=code_steps,
column_names=column_names,
special_prompt=special_prompt,
module_name=module_name,
output_desc=output_desc,
function_catalog=tool_catalog[idx],
)
tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS)
rsp = await self.llm.aask_code(prompt, **tool_config)
logger.info(f"rsp is: {rsp}")
# final_code = final_code + "\n\n" + rsp["code"]
# final_code[key] = rsp["code"]
new_code = new_code + "\n\n" + rsp["code"]
code_context = code_context + "\n\n" + rsp["code"]
return new_code
else:
hist_info = f"Previous finished code is \n\n ```Python {code_context} ``` \n\n "
prompt = GENERATE_CODE_PROMPT.format(
goal=plan.current_task.instruction,
context=hist_info,
code_steps=code_steps,
prompt = TOOL_USAGE_PROMPT.format(
user_requirement=plan.goal,
history_code=code_context,
current_task=plan.current_task.instruction,
column_info=column_info,
special_prompt=special_prompt,
# column_names=column_names
code_steps=code_steps,
module_name=module_name,
tool_catalog=tool_catalog,
)
else:
prompt = GENERATE_CODE_PROMPT.format(
user_requirement=plan.goal,
history_code=code_context,
current_task=plan.current_task.instruction,
column_info=column_info,
special_prompt=special_prompt,
code_steps=code_steps,
)
tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS)
logger.info(f"prompt is: {prompt}")
rsp = await self.llm.aask_code(prompt, **tool_config)
logger.info(f"rsp is: {rsp}")
return rsp["code"]
tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS)
rsp = await self.llm.aask_code(prompt, **tool_config)
context = [Message(content=prompt, role="user")]
return context, rsp["code"]

View file

@ -63,8 +63,8 @@ class WriteCodeSteps(Action):
def get_context(self, plan: Plan):
user_requirement = plan.goal
# select_task_keys = ['task_id', 'instruction', 'is_finished', 'code']
select_task_keys = ['task_id','code']
select_task_keys = ['task_id', 'instruction', 'is_finished', 'code']
# select_task_keys = ['task_id','code']
def process_task(task):
task_dict = task.dict()
@ -72,7 +72,7 @@ class WriteCodeSteps(Action):
return ptask
tasks = json.dumps(
[process_task(task) for task in plan.tasks if task.is_finished==True], indent=4, ensure_ascii=False
[process_task(task) for task in plan.tasks], indent=4, ensure_ascii=False
)
current_task = json.dumps(process_task(plan.current_task)) if plan.current_task else {}

View file

@ -4,6 +4,31 @@
# @Author : lidanyang
# @File : ml_engineer
# @Desc :
UPDATE_DATA_COLUMNS = """
# Background
Keep dataset column information updated to reflect changes in training or testing datasets, aiding in informed decision-making during data analysis.
## Done Tasks
```python
{history_code}
```end
# Task
Update and print the dataset's column information only if the train or test data has changed. Use the following code:
```python
from metagpt.tools.functions.libs.data_preprocess import get_column_info
column_info = get_column_info(df)
print("df_column_info")
print(column_info)
```end
# Constraints:
- Use the DataFrame variable from 'Done Tasks' in place of df.
- Import `get_column_info` only if it's not already imported.
- Skip update if no changes in training/testing data, except for initial data load.
- No need to update info if only model evaluation is performed.
"""
GEN_DATA_DESC_PROMPT = """
Here is the head 5 rows of the dataset:
{data_head}
@ -34,7 +59,8 @@ Please assign a task type to each task in the list below from the given categori
- **feature_engineering**: Only for creating new columns for input data.
- **data_preprocess**: Only for changing value inplace.
- **model_train**: Only for training model.
- **other**: Any tasks that do not fit into the previous categories, such as visualization, summarizing findings, build model, etc.
- **model_evaluate**: Only for evaluating model.
- **other**: Any tasks that do not fit into the previous categories, such as visualization, summarizing findings, etc.
"""
ASSIGN_TASK_TYPE = {
@ -107,206 +133,122 @@ CODE_GENERATOR_WITH_TOOLS = {
},
}
TOOL_USAGE_PROMPT = """
## Target
{goal}
Specifically, {special_prompt}
## History Info
{context}
## Code Steps for Current Task:
Follow steps below when you writing code if it's convenient.
{code_steps}
## Available Tools:
Each function is described in JSON format, including the function name and parameters. {output_desc}
{function_catalog}
When you call a function above, you should import the function from `{module_name}` first, e.g.:
```python
from metagpt.tools.functions.libs.data_preprocess import fill_missing_value
```end
## Your Output Format:
Generate the complete code for this task:
```python
# Tools used: [function names or 'none']
<your code for the current task, without any comments>
```end
## Attention:
Make sure use the columns from the dataset columns: {column_names}
Finish your coding tasks as a helpful programmer based on the tools.
"""
PRINT_DATA_COLUMNS = {
"name": "print_column_info",
"description": "Print the latest column information after 'Done Tasks' code if first read or data changed.",
"parameters": {
"type": "object",
"properties": {
"is_update": {
"type": "boolean",
"description": "Whether need to update the column info.",
},
"code": {
"type": "string",
"description": "The code to be added to a new cell in jupyter.",
},
},
"required": ["is_update", "code"],
},
}
GENERATE_CODE_PROMPT = """
## Target
{goal}
Specifically, {special_prompt}
## Finished Task and Code
{context}
## Code Steps for Current Task:
Follow steps below when you writing code if it's convenient.
{code_steps}
## Instruction
Finished task and code are executable, and you should based on the code to continue your current task
Do not repeat functions and code, try to reuse the code in [Finished Task and Code]
## Your Output Format:
Generate the complete code for this task:
```python
import pandas as pd
```
## Attention:
Make sure use the columns from the dataset columns
Finish your coding tasks as a helpful programmer based on the code.
"""
TOOL_USAGE_PROMPT = """
## Target
{goal}
## History Info
{context}
## Available Tools:
Each function is described in JSON format, including the function name and parameters. {output_desc}
{function_catalog}
When you call a function above, you should import the function from `{module_name}` first, e.g.:
```python
from metagpt.tools.functions.libs.data_preprocess import fill_missing_value
```end
## Your Output Format:
Generate the complete code for this task:
```python
# Tools used: [function names or 'none']
<your code for the current task, without any comments>
```end
## Attention:
Make sure use the columns from the dataset columns
Finish your coding tasks as a helpful programmer based on the tools.
"""
TOOL_ORGANIZATION_PROMPT = """
The previous conversation has provided all tasks step-by-step for the use goal and their statuses.
Now, begin writing code for the current task. This code should writen strictly on the basis of all previous completed tasks code, not a standalone code. And avoid writing duplicate code that has already been written in previous tasks, such as repeated import of packages, reading data, etc.
Specifically, {special_prompt}
You can utilize pre-defined tools in 'Available Tools' if the tools are sufficient. And you should combine the use of other public packages if necessary, like sklearn, numpy, pandas, etc..
## Code Steps for Current Task:
Follow steps below when you writing code if it's convenient.
{code_steps}
## Available Tools:
Each function is described in JSON format, including the function name and parameters. {output_desc}
{function_catalog}
When you call a function above, you should import the function from `{module_name}` first, e.g.:
```python
from metagpt.tools.functions.libs.data_preprocess import fill_missing_value
```end
## Your Output Format:
Generate the complete code for this task:
```python
# Tools used: [function names or 'none']
<your code for the current task, without any comments>
```end
*** Important Rules ***
- If you use tool not in the list, you should implement it by yourself.
- Ensure the output new code is executable in the same Jupyter notebook environment with previous tasks code have been executed.
- When write code for current task, remember the code should be coherent with previous tasks code.
- Remember that don't process the columns have been processed in previous tasks and don't mock data yourself.
- Prioritize using tools for the same functionality.
"""
DATA_PREPROCESS_PROMPT = """
The current task is about data preprocessing, closely monitor each column's data type. Apply suitable methods for various types (numerical, categorical, datetime, textual, etc.) to ensure the pandas.DataFrame is correctly formatted.
Additionally, ensure that the columns being processed must be the ones that actually exist in the dataset.
Don't write processed data to files.
"""
FEATURE_ENGINEERING_PROMPT = """
The current task is about feature engineering. when performing it, please adhere to the following principles:
- Ensure that the feature you're working with is indeed present in the dataset and consider the data type (numerical, categorical, etc.) and application scenario (classification, regression tasks, etc.).
- When generate new features, you should combine real world knowledge and decide what features are useful for the task.
- Generate as diverse features as possible to improve the model's performance.
- Before generating a new feature, ensure the used features are already processed and ready to use.
"""
DATA_PROCESS_PROMPT = """
# Background
As a data scientist, you need to help user to achieve the goal [{user_requirement}] step-by-step in an continuous Jupyter notebook.
Assist in completing [{user_requirement}] in a Jupyter notebook.
## Done Tasks
## Task Progress
### Done Tasks
```python
{history_code}
```end
## Current Task
### Current Task
{current_task}
# Latest Data Info
Latest data info after previous tasks:
## Latest Data Info
{column_info}
# Task
Write a Python function for 'Current Task'. Start by copying the input DataFrame. Avoid duplicating code from 'Done Tasks'.
Specifically, {special_prompt}
Fully implement 'Current Task', ensuring all necessary steps are covered without repeating code from 'Done Tasks'. Specifically, {special_prompt}
# Code Steps:
Follow steps below when you writing code if it's convenient.
{code_steps}
"""
TOOL_USAGE_PROMPT = """
# Background
Assist in completing [{user_requirement}] in a Jupyter notebook.
## Task Progress
### Done Tasks
```python
{history_code}
```end
### Current Task
{current_task}
## Latest Data Info
{column_info}
# Task
Fully implement 'Current Task', ensuring all necessary steps are covered without repeating code from 'Done Tasks'. Specifically, {special_prompt}
# Code Steps:
Follow steps below when you writing code if it's convenient.
{code_steps}
# Capabilities
- You can utilize pre-defined tools in any code lines from 'Available Tools' in the form of python functions.
- You can utilize pre-defined tools in any code lines from 'Available Tools' in the form of Python Class.
- You can freely combine the use of any other public packages, like sklearn, numpy, pandas, etc..
- You can do anything about data preprocessing, feature engineering, model training, etc..
# Available Tools:
Each function tool is described in JSON format. {output_desc}
When you call a function below, import the function from `{module_name}` first.
{function_catalog}
Each Class tool is described in JSON format. When you call it, import the tool from `{module_name}` first.
{tool_catalog}
# Output Example:
when current task is "fill missing value and handle outliers", the output code be like:
For "fill missing value and handle outliers", the output code be like when there are training data and test data:
```python
from metagpt.tools.functions.libs.data_preprocess import fill_missing_value
# Tools used: ['FillMissingValue']
from metagpt.tools.functions.libs.data_preprocess import FillMissingValue
def function_name(df):
df_processed = df.copy()
num_cols = df_processed.select_dtypes(include='number').columns.tolist()
df_processed = fill_missing_value(df_processed, num_cols, 'mean')
for col in num_cols:
low, high = df_processed[col].quantile([0.01, 0.99])
df_processed[col] = df_processed[col].clip(low, high)
return df_processed
train_processed = train.copy()
test_processed = test.copy()
num_cols = train_processed.select_dtypes(include='number').columns.tolist()
fill_missing_value = FillMissingValue(features=num_cols, strategy='mean')
fill_missing_value.fit(train_processed)
train_processed = fill_missing_value.transform(train_processed)
test_processed = fill_missing_value.transform(test_processed)
df_processed = function_name(df)
print(df_processed.info())
for col in num_cols:
low, high = train_processed[col].quantile([0.01, 0.99])
train_processed[col] = train_processed[col].clip(low, high)
test_processed[col] = test_processed[col].clip(low, high)
```end
# Constraints:
- Ensure the output new code is executable in the same Jupyter notebook with previous tasks code have been executed.
- Prioritize using pre-defined tools for the same functionality.
- Return DataFrame should always be named `df_processed`, while the input DataFrame should based on the done tasks' output DataFrame.
- Limit to one print statement for the output DataFrame's info.
- Copy DataFrame before processing if needed.
- If 'Code Steps' contains step done in 'Done Tasks', such as reading data, don't repeat it.
"""
DATA_PREPROCESS_PROMPT = """
The current task is about data preprocessing, please note the following:
- Monitor data types per column, applying appropriate methods.
- Ensure operations are on existing dataset columns.
- Avoid writing processed data to files.
- Prefer alternatives to one-hot encoding for categorical data.
- Only encode necessary categorical columns to allow for potential feature-specific engineering tasks later.
"""
FEATURE_ENGINEERING_PROMPT = """
The current task is about feature engineering. when performing it, please adhere to the following principles:
- Ensure operations are on existing dataset columns and consider the data type (numerical, categorical, etc.) and application scenario (classification, regression tasks, etc.).
- Create impactful features based on real-world knowledge and column info.
- Generate as diverse features as possible to improve the model's performance.
- If potential impactful features are not included in 'Code Steps', add new steps to generate them.
"""
MODEL_TRAIN_PROMPT = """
@ -316,23 +258,17 @@ The current task is about training a model, please ensure high performance:
- Use the data from previous task result directly, do not mock or reload data yourself.
"""
DATA_PREPROCESS_OUTPUT_DESC = "Please note that all functions output a updated pandas.DataFrame after data preprocessing."
FEATURE_ENGINEERING_OUTPUT_DESC = "Please note that all functions output a updated pandas.DataFrame with new features added or existing features modified."
CLASSIFICATION_MODEL_OUTPUT_DESC = ""
REGRESSION_MODEL_OUTPUT_DESC = ""
MODEL_EVALUATE_PROMPT = """
The current task is about evaluating a model, please note the following:
- Ensure that the evaluated data is same processed as the training data.
- Use trained model from previous task result directly, do not mock or reload model yourself.
"""
ML_SPECIFIC_PROMPT = {
"data_preprocess": DATA_PREPROCESS_PROMPT,
"feature_engineering": FEATURE_ENGINEERING_PROMPT,
"model_train": MODEL_TRAIN_PROMPT,
}
TOOL_OUTPUT_DESC = {
"data_preprocess": DATA_PREPROCESS_OUTPUT_DESC,
"feature_engineering": FEATURE_ENGINEERING_OUTPUT_DESC,
"model_evaluate": MODEL_EVALUATE_PROMPT,
}
ML_MODULE_MAP = {

View file

@ -1,5 +1,6 @@
import json
import re
from datetime import datetime
from typing import List
import fire
@ -10,12 +11,16 @@ from metagpt.actions.execute_code import ExecutePyCode
from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools
from metagpt.actions.write_code_steps import WriteCodeSteps
from metagpt.actions.write_plan import WritePlan
from metagpt.const import DATA_PATH
from metagpt.const import DATA_PATH, PROJECT_ROOT
from metagpt.logs import logger
from metagpt.prompts.ml_engineer import GEN_DATA_DESC_PROMPT
from metagpt.prompts.ml_engineer import (
GEN_DATA_DESC_PROMPT,
UPDATE_DATA_COLUMNS,
PRINT_DATA_COLUMNS
)
from metagpt.roles import Role
from metagpt.schema import Message, Plan
from metagpt.utils.common import CodeParser
from metagpt.utils.common import CodeParser, remove_comments, create_func_config
from metagpt.actions.debug_code import DebugCode
STRUCTURAL_CONTEXT = """
@ -57,34 +62,6 @@ def remove_escape_and_color_codes(input_str):
return result
def read_data(file: str) -> pd.DataFrame:
if file.endswith(".csv"):
df = pd.read_csv(file, sep=",")
sep_list = [";", "\t", ":", " ", "|"]
for sep in sep_list:
if df.shape[1] == 1:
df = pd.read_csv(file, sep=sep)
else:
break
else:
raise ValueError(f"Unsupported file type: {file}")
return df
def get_column_info(df: pd.DataFrame) -> str:
data = []
for i in df.columns:
nan_freq = float("%.2g" % (df[i].isna().mean() * 100))
n_unique = df[i].nunique()
data.append([i, df[i].dtype, nan_freq, n_unique])
samples = pd.DataFrame(
data,
columns=["Column_name", "Data_type", "NaN_Frequency(%)", "N_unique"],
)
return samples.to_string(index=False)
class AskReview(Action):
async def run(self, context: List[Message], plan: Plan = None):
logger.info("Current overall plan:")
@ -108,26 +85,20 @@ class AskReview(Action):
return rsp, confirmed
class GenerateDataDesc(Action):
async def run(self, file: str) -> dict:
data_desc = {}
df = read_data(file)
data_head = df.head().to_dict(orient="list")
data_head = json.dumps(data_head, indent=4, ensure_ascii=False)
prompt = GEN_DATA_DESC_PROMPT.replace("{data_head}", data_head)
rsp = await self._aask(prompt)
rsp = CodeParser.parse_code(block=None, text=rsp)
rsp = json.loads(rsp)
data_desc["path"] = file
data_desc["data_desc"] = rsp["data_desc"]
data_desc["column_desc"] = rsp["column_desc"]
data_desc["column_info"] = get_column_info(df)
return data_desc
class UpdateDataColumns(Action):
async def run(self, plan: Plan = None) -> dict:
finished_tasks = plan.get_finished_tasks()
code_context = [remove_comments(task.code) for task in finished_tasks]
code_context = "\n\n".join(code_context)
prompt = UPDATE_DATA_COLUMNS.format(history_code=code_context)
tool_config = create_func_config(PRINT_DATA_COLUMNS)
rsp = await self.llm.aask_code(prompt, **tool_config)
return rsp
class MLEngineer(Role):
def __init__(
self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False, data_path: str = None
self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False,
):
super().__init__(name=name, profile=profile, goal=goal)
self._set_react_mode(react_mode="plan_and_act")
@ -136,19 +107,15 @@ class MLEngineer(Role):
self.use_code_steps = True
self.execute_code = ExecutePyCode()
self.auto_run = auto_run
self.data_path = data_path
self.data_desc = {}
async def _plan_and_act(self):
if self.data_path:
self.data_desc = await self._generate_data_desc()
# create initial plan and update until confirmation
await self._update_plan()
while self.plan.current_task:
task = self.plan.current_task
logger.info(f"ready to take on task: {task}")
logger.info(f"ready to take on task {task}")
# take on current task
code, result, success, code_steps = await self._write_and_exec_code()
@ -156,13 +123,6 @@ class MLEngineer(Role):
# ask for acceptance, users can other refuse and change tasks in the plan
task_result_confirmed = await self._ask_review()
# 针对当前task进行单独plan
# if not success or not task_result_confirmed:
# # fixme: 增加对应plan
# logger.info(task.result)
# # import pdb;pdb.set_trace()
# # self.state.plan()
if success and task_result_confirmed:
# tick off this task and record progress
task.code = code
@ -170,25 +130,27 @@ class MLEngineer(Role):
task.code_steps = code_steps
self.plan.finish_current_task()
self.working_memory.clear()
if "print(df_processed.info())" in code:
self.data_desc["column_info"] = result
success, new_code = await self._update_data_columns()
if success:
task.code = task.code + "\n\n" + new_code
else:
# update plan according to user's feedback and to take on changed tasks
await self._update_plan()
# finished_tasks = self.plan.get_finished_tasks()
# if len(finished_tasks) == len(self.plan.tasks):
# code_context = [task.code for task in finished_tasks]
# code_context = "\n\n".join(code_context)
# result, success = await self.execute_code.run(code_context)
# # truncated the result
# print(truncate(result))
async def _generate_data_desc(self):
data_desc = await GenerateDataDesc().run(self.data_path)
return data_desc
time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
self.execute_code.save_notebook(f"{DATA_PATH}/notebooks/ml_{time}.ipynb")
async def _update_data_columns(self):
rsp = await UpdateDataColumns().run(self.plan)
is_update, code = rsp["is_update"], rsp["code"]
success = False
if is_update:
result, success = await self.execute_code.run(code)
if success:
self.data_desc["column_info"] = result
return success, code
async def _write_and_exec_code(self, max_retry: int = 3):
code_steps = (
await WriteCodeSteps().run(self.plan)
@ -199,6 +161,7 @@ class MLEngineer(Role):
counter = 0
improve_code = ""
success = False
debug_context = []
finished_tasks = self.plan.get_finished_tasks()
code_context = [task.code for task in finished_tasks]
@ -207,35 +170,38 @@ class MLEngineer(Role):
code_result = "\n\n".join(code_result)
while not success and counter < max_retry:
if counter == 0:
context = self.get_useful_memories()
else:
context = self.get_useful_memories()
if counter > 0:
improve_code = await DebugCode().run(plan=self.plan.current_task.instruction,
finished_code=code_context,
finished_code_result=code_result,
# finished_code=code_context,
# finished_code_result=code_result,
code=code,
runtime_result=self.working_memory.get())
if not self.use_tools or self.plan.current_task.task_type == "other":
runtime_result=self.working_memory.get(),
context=debug_context)
if improve_code != "":
code = improve_code
logger.info(f"new code \n{improve_code}")
cause_by = DebugCode
elif not self.use_tools or self.plan.current_task.task_type == "other":
logger.info("Write code with pure generation")
code = await WriteCodeByGenerate().run(
context=context, plan=self.plan, code_steps=code_steps, temperature=0.0
)
debug_context = [self.get_useful_memories(task_exclude_field={'result', 'code_steps'})[0]]
cause_by = WriteCodeByGenerate
else:
logger.info("Write code with tools")
if improve_code != "":
code = improve_code
logger.info(f"new code {code}")
cause_by = DebugCode
else:
code = await WriteCodeWithTools().run(
context=context, plan=self.plan, code_steps=code_steps, **{"column_names": {}}
)
cause_by = WriteCodeWithTools
schema_path = PROJECT_ROOT / "metagpt/tools/functions/schemas"
tool_context, code = await WriteCodeWithTools(schema_path=schema_path).run(
context=context,
plan=self.plan,
code_steps=code_steps,
column_info=self.data_desc.get("column_info", ""),
)
debug_context = tool_context
cause_by = WriteCodeWithTools
self.working_memory.add(
Message(content=code, role="assistant", cause_by=cause_by)
@ -243,9 +209,7 @@ class MLEngineer(Role):
# debug on code, run on runcode with finished code and new_df
# runcode = code_context + "\n\n" + code
runcode = code
result, success = await self.execute_code.run(runcode)
result, success = await self.execute_code.run(code)
# truncated the result
print(truncate(result))
@ -294,12 +258,12 @@ class MLEngineer(Role):
self.plan.add_tasks(tasks)
self.working_memory.clear()
def get_useful_memories(self) -> List[Message]:
def get_useful_memories(self, task_exclude_field: set = None) -> List[Message]:
"""find useful memories only to reduce context length and improve performance"""
# TODO dataset description , code steps
user_requirement = self.plan.goal
tasks = json.dumps(
[task.dict() for task in self.plan.tasks], indent=4, ensure_ascii=False
[task.dict(exclude=task_exclude_field) for task in self.plan.tasks], indent=4, ensure_ascii=False
)
current_task = self.plan.current_task.json() if self.plan.current_task else {}
context = STRUCTURAL_CONTEXT.format(
@ -326,12 +290,13 @@ if __name__ == "__main__":
# requirement = "Perform data analysis on the provided data. Train a model to predict the target variable Survived. Include data preprocessing, feature engineering, and modeling in your pipeline. The metric is accuracy."
data_path = f"{DATA_PATH}/titanic"
requirement = f"This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'."
async def main(requirement: str = requirement, auto_run: bool = True, data_path: str = ""):
role = MLEngineer(goal=requirement, auto_run=auto_run, data_path=data_path)
# data_path = f"{DATA_PATH}/titanic"
# requirement = f"This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'."
# requirement = f"Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy"
data_path = f"{DATA_PATH}/icr-identify-age-related-conditions"
requirement = f"This is a medical dataset with over fifty anonymized health characteristics linked to three age-related conditions. Your goal is to predict whether a subject has or has not been diagnosed with one of these conditions.The target column is Class. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report f1 score on the eval data. Train data path: {data_path}/split_train.csv, eval data path: {data_path}/split_eval.csv."
async def main(requirement: str = requirement, auto_run: bool = True):
role = MLEngineer(goal=requirement, auto_run=auto_run)
await role.run(requirement)

View file

@ -1,6 +1,6 @@
import numpy as np
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import KBinsDiscretizer, LabelEncoder
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MaxAbsScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import OneHotEncoder
@ -8,7 +8,6 @@ from sklearn.preprocessing import OrdinalEncoder
from sklearn.preprocessing import RobustScaler
from sklearn.preprocessing import StandardScaler
from metagpt.tools.functions import registry
from metagpt.tools.functions.libs.base import MLProcess
from metagpt.tools.functions.schemas.data_preprocess import *
@ -57,15 +56,6 @@ class StandardScale(MLProcess):
return df
@registry.register("data_preprocess", LogTransform)
def log_transform(df: pd.DataFrame, features: list, ):
for col in features:
if df[col].min() <= 0:
df[col] = df[col] - df[col].min() + 2
df[col] = np.log(df[col])
return df
class MaxAbsScale(MLProcess):
def __init__(self, features: list,):
self.features = features
@ -146,7 +136,7 @@ class LabelEncode(MLProcess):
return df
def get_column_info(df: pd.DataFrame) -> str:
def get_column_info(df: pd.DataFrame) -> dict:
data = []
for i in df.columns:
nan_freq = float("%.2g" % (df[i].isna().mean() * 100))
@ -157,7 +147,7 @@ def get_column_info(df: pd.DataFrame) -> str:
data,
columns=["Column_name", "Data_type", "NaN_Frequency(%)", "N_unique"],
)
return samples.to_string(index=False)
return samples.to_dict(orient='list')
#
#
# if __name__ == '__main__':

View file

@ -10,6 +10,7 @@ import numpy as np
from dateutil.relativedelta import relativedelta
from joblib import Parallel, delayed
from pandas.api.types import is_numeric_dtype
from pandas.core.dtypes.common import is_object_dtype
from sklearn.model_selection import KFold
from sklearn.preprocessing import PolynomialFeatures, KBinsDiscretizer
@ -280,6 +281,10 @@ class GeneralSelection(MLProcess):
or df.loc[df[col] == np.inf].shape[0] != 0
):
feats.remove(col)
if is_object_dtype(df[col]) and df[col].nunique() == df.shape[0]:
feats.remove(col)
self.feats = feats
def transform(self, df: pd.DataFrame) -> pd.DataFrame:

View file

@ -328,7 +328,7 @@ GroupStat:
SplitBins:
type: class
description: "Bin continuous data into intervals and return the bin identifier encoded as an integer value"
description: "Inplace binning of continuous data into intervals, returning integer-encoded bin identifiers directly."
methods:
__init__:
description: "Initialize self."
@ -336,11 +336,15 @@ SplitBins:
properties:
cols:
type: list
description: "Columns to be binned."
description: "Columns to be binned inplace."
strategy:
type: str
description: "Strategy used to define the widths of the bins."
default: quantile
enum:
- quantile
- uniform
- kmeans
required:
- cols
fit: