From 49779d8615e4b05b759b549b6d7ceb9b5258ec0a Mon Sep 17 00:00:00 2001 From: lidanyang Date: Wed, 13 Dec 2023 13:35:22 +0800 Subject: [PATCH 1/5] refine schema desc --- metagpt/tools/functions/schemas/feature_engineering.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/metagpt/tools/functions/schemas/feature_engineering.yml b/metagpt/tools/functions/schemas/feature_engineering.yml index 2cc4ec2fa..4f2a7100d 100644 --- a/metagpt/tools/functions/schemas/feature_engineering.yml +++ b/metagpt/tools/functions/schemas/feature_engineering.yml @@ -328,7 +328,7 @@ GroupStat: SplitBins: type: class - description: "Bin continuous data into intervals and return the bin identifier encoded as an integer value" + description: "Inplace binning of continuous data into intervals, returning integer-encoded bin identifiers directly." methods: __init__: description: "Initialize self." @@ -336,11 +336,15 @@ SplitBins: properties: cols: type: list - description: "Columns to be binned." + description: "Columns to be binned inplace." strategy: type: str description: "Strategy used to define the widths of the bins." default: quantile + enum: + - quantile + - uniform + - kmeans required: - cols fit: From f614fbfa7c3e22e968bc4229271df092c3be9575 Mon Sep 17 00:00:00 2001 From: lidanyang Date: Wed, 13 Dec 2023 13:37:40 +0800 Subject: [PATCH 2/5] update ml tools --- metagpt/tools/functions/libs/data_preprocess.py | 16 +++------------- .../tools/functions/libs/feature_engineering.py | 5 +++++ 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/metagpt/tools/functions/libs/data_preprocess.py b/metagpt/tools/functions/libs/data_preprocess.py index 39474b0fd..fa70bf8fc 100644 --- a/metagpt/tools/functions/libs/data_preprocess.py +++ b/metagpt/tools/functions/libs/data_preprocess.py @@ -1,6 +1,6 @@ import numpy as np from sklearn.impute import SimpleImputer -from sklearn.preprocessing import KBinsDiscretizer, LabelEncoder +from sklearn.preprocessing import LabelEncoder from sklearn.preprocessing import MaxAbsScaler from sklearn.preprocessing import MinMaxScaler from sklearn.preprocessing import OneHotEncoder @@ -8,7 +8,6 @@ from sklearn.preprocessing import OrdinalEncoder from sklearn.preprocessing import RobustScaler from sklearn.preprocessing import StandardScaler -from metagpt.tools.functions import registry from metagpt.tools.functions.libs.base import MLProcess from metagpt.tools.functions.schemas.data_preprocess import * @@ -57,15 +56,6 @@ class StandardScale(MLProcess): return df -@registry.register("data_preprocess", LogTransform) -def log_transform(df: pd.DataFrame, features: list, ): - for col in features: - if df[col].min() <= 0: - df[col] = df[col] - df[col].min() + 2 - df[col] = np.log(df[col]) - return df - - class MaxAbsScale(MLProcess): def __init__(self, features: list,): self.features = features @@ -146,7 +136,7 @@ class LabelEncode(MLProcess): return df -def get_column_info(df: pd.DataFrame) -> str: +def get_column_info(df: pd.DataFrame) -> dict: data = [] for i in df.columns: nan_freq = float("%.2g" % (df[i].isna().mean() * 100)) @@ -157,7 +147,7 @@ def get_column_info(df: pd.DataFrame) -> str: data, columns=["Column_name", "Data_type", "NaN_Frequency(%)", "N_unique"], ) - return samples.to_string(index=False) + return samples.to_dict(orient='list') # # # if __name__ == '__main__': diff --git a/metagpt/tools/functions/libs/feature_engineering.py b/metagpt/tools/functions/libs/feature_engineering.py index 67247d0d1..de54e4db0 100644 --- a/metagpt/tools/functions/libs/feature_engineering.py +++ b/metagpt/tools/functions/libs/feature_engineering.py @@ -10,6 +10,7 @@ import numpy as np from dateutil.relativedelta import relativedelta from joblib import Parallel, delayed from pandas.api.types import is_numeric_dtype +from pandas.core.dtypes.common import is_object_dtype from sklearn.model_selection import KFold from sklearn.preprocessing import PolynomialFeatures, KBinsDiscretizer @@ -280,6 +281,10 @@ class GeneralSelection(MLProcess): or df.loc[df[col] == np.inf].shape[0] != 0 ): feats.remove(col) + + if is_object_dtype(df[col]) and df[col].nunique() == df.shape[0]: + feats.remove(col) + self.feats = feats def transform(self, df: pd.DataFrame) -> pd.DataFrame: From 33810829a072467a8f61f2f7dc14ffd1792e793a Mon Sep 17 00:00:00 2001 From: lidanyang Date: Wed, 13 Dec 2023 14:31:32 +0800 Subject: [PATCH 3/5] support tool in debug --- metagpt/actions/debug_code.py | 106 ++++++++++++++++++++++------------ 1 file changed, 68 insertions(+), 38 deletions(-) diff --git a/metagpt/actions/debug_code.py b/metagpt/actions/debug_code.py index 9efe93efc..53ca2f54d 100644 --- a/metagpt/actions/debug_code.py +++ b/metagpt/actions/debug_code.py @@ -3,7 +3,7 @@ from typing import Dict, List, Union, Tuple, Optional, Any from metagpt.actions import Action from metagpt.logs import logger from metagpt.schema import Message, Plan -from metagpt.utils.common import CodeParser +from metagpt.utils.common import CodeParser, create_func_config from metagpt.actions.write_analysis_code import BaseWriteAnalysisCode DEBUG_REFLECTION_EXAMPLE = '''Example 1: @@ -39,25 +39,39 @@ DEBUG_REFLECTION_EXAMPLE = '''Example 1: REFLECTION_PROMPT = """ Here is an example for you. {debug_example} - [requirement] - {goal} - [finished code] - finished code are executable, and you should based on the code to continue your current code debug - {finished_code} - - try to reuse the code here to understand the coding task. + [context] + {context} [previous impl] {code} [runtime Error] {runtime_result} - Analysis the error step by step, provide me improve method. Do not repeat [previous impl] + Analysis the error step by step, provide me improve method and code. Remember to follow [context] requirement. [reflection on previous impl]: xxx """ +CODE_REFLECTION = { + "name": "execute_reflection_code", + "description": "Execute reflection code.", + "parameters": { + "type": "object", + "properties": { + "reflection": { + "type": "string", + "description": "Reflection on previous impl.", + }, + "improved_impl": { + "type": "string", + "description": "Refined code after reflection.", + }, + }, + "required": ["reflection", "improved_impl"], + }, +} + def message_to_str(message: Message) -> str: return f"{message.role}: {message.content}" @@ -75,52 +89,68 @@ class DebugCode(BaseWriteAnalysisCode): def __init__(self, **kwargs: Any): super().__init__(**kwargs) - async def run_reflection(self, goal, finished_code, finished_code_result, code, runtime_result) -> str: + async def run_reflection( + self, + # goal, + # finished_code, + # finished_code_result, + context: List[Message], + code, + runtime_result, + ) -> dict: info = [] - finished_code_and_result = finished_code + "\n [finished results]\n\n" + finished_code_result + # finished_code_and_result = finished_code + "\n [finished results]\n\n" + finished_code_result reflection_prompt = REFLECTION_PROMPT.format(debug_example=DEBUG_REFLECTION_EXAMPLE, - goal=goal, - finished_code=finished_code_and_result, + context=context, + # goal=goal, + # finished_code=finished_code_and_result, code=code, runtime_result=runtime_result ) - system_prompt = "You are an AI Python assistant. You will be given your previous implementation of a function, runtime error results, and a hint to change the implementation appropriately. Write your full implementation " + system_prompt = "You are an AI Python assistant. You will be given your previous implementation code of a task, runtime error results, and a hint to change the implementation appropriately. Write your full implementation " info.append(Message(role="system", content=system_prompt)) - info.append(Message(role="assistant", content=reflection_prompt)) + info.append(Message(role="user", content=reflection_prompt)) - msg = messages_to_str(info) - resp = await self.llm.aask(msg=msg) + # msg = messages_to_str(info) + # resp = await self.llm.aask(msg=msg) + resp = await self.llm.aask_code(messages=info, **create_func_config(CODE_REFLECTION)) logger.info(f"reflection is {resp}") return resp - async def rewrite_code(self, reflection: str = "", code_context: str = "") -> str: - """ - 根据reflection重写代码 - """ - info = [] - info.append(Message(role="assistant", content=f"[code context]:{code_context}" - f"finished code are executable, and you should based on the code to continue your current code debug and improvement" - f"[reflection]: \n {reflection}")) - info.append(Message(role="user", content=f"[improved impl]:\n Return in Python block")) - msg = messages_to_str(info) - resp = await self.llm.aask(msg=msg) - logger.info(f"improve code is {resp}") - improv_code = CodeParser.parse_code(block=None, text=resp) - return improv_code + # async def rewrite_code(self, reflection: str = "", context: List[Message] = None) -> str: + # """ + # 根据reflection重写代码 + # """ + # info = context + # # info.append(Message(role="assistant", content=f"[code context]:{code_context}" + # # f"finished code are executable, and you should based on the code to continue your current code debug and improvement" + # # f"[reflection]: \n {reflection}")) + # info.append(Message(role="assistant", content=f"[reflection]: \n {reflection}")) + # info.append(Message(role="user", content=f"[improved impl]:\n Return in Python block")) + # msg = messages_to_str(info) + # resp = await self.llm.aask(msg=msg) + # improv_code = CodeParser.parse_code(block=None, text=resp) + # return improv_code async def run(self, + context: List[Message] = None, plan: str = "", - finished_code: str = "", - finished_code_result: str = "", + # finished_code: str = "", + # finished_code_result: str = "", code: str = "", runtime_result: str = "") -> str: """ 根据当前运行代码和报错信息进行reflection和纠错 """ - reflection = await self.run_reflection(plan, finished_code=finished_code, - finished_code_result=finished_code_result, - code=code, - runtime_result=runtime_result) + reflection = await self.run_reflection( + # plan, + # finished_code=finished_code, + # finished_code_result=finished_code_result, + code=code, + context=context, + runtime_result=runtime_result, + ) # 根据reflection结果重写代码 - improv_code = await self.rewrite_code(reflection, code_context=finished_code) + # improv_code = await self.rewrite_code(reflection, context=context) + improv_code = reflection['improved_impl'] return improv_code From ab7af7768c00acc0c3f900430b402c64637f7b0f Mon Sep 17 00:00:00 2001 From: lidanyang Date: Wed, 13 Dec 2023 14:31:50 +0800 Subject: [PATCH 4/5] refine prompt --- metagpt/prompts/ml_engineer.py | 298 +++++++++++++-------------------- 1 file changed, 117 insertions(+), 181 deletions(-) diff --git a/metagpt/prompts/ml_engineer.py b/metagpt/prompts/ml_engineer.py index 5c7b9f82e..d11cbf453 100644 --- a/metagpt/prompts/ml_engineer.py +++ b/metagpt/prompts/ml_engineer.py @@ -4,6 +4,31 @@ # @Author : lidanyang # @File : ml_engineer # @Desc : +UPDATE_DATA_COLUMNS = """ +# Background +Keep dataset column information updated to reflect changes in training or testing datasets, aiding in informed decision-making during data analysis. +## Done Tasks +```python +{history_code} +```end + +# Task +Update and print the dataset's column information only if the train or test data has changed. Use the following code: +```python +from metagpt.tools.functions.libs.data_preprocess import get_column_info + +column_info = get_column_info(df) +print("df_column_info") +print(column_info) +```end + +# Constraints: +- Use the DataFrame variable from 'Done Tasks' in place of df. +- Import `get_column_info` only if it's not already imported. +- Skip update if no changes in training/testing data, except for initial data load. +- No need to update info if only model evaluation is performed. +""" + GEN_DATA_DESC_PROMPT = """ Here is the head 5 rows of the dataset: {data_head} @@ -34,7 +59,8 @@ Please assign a task type to each task in the list below from the given categori - **feature_engineering**: Only for creating new columns for input data. - **data_preprocess**: Only for changing value inplace. - **model_train**: Only for training model. -- **other**: Any tasks that do not fit into the previous categories, such as visualization, summarizing findings, build model, etc. +- **model_evaluate**: Only for evaluating model. +- **other**: Any tasks that do not fit into the previous categories, such as visualization, summarizing findings, etc. """ ASSIGN_TASK_TYPE = { @@ -107,206 +133,122 @@ CODE_GENERATOR_WITH_TOOLS = { }, } -TOOL_USAGE_PROMPT = """ -## Target -{goal} -Specifically, {special_prompt} - -## History Info -{context} - -## Code Steps for Current Task: -Follow steps below when you writing code if it's convenient. -{code_steps} - -## Available Tools: -Each function is described in JSON format, including the function name and parameters. {output_desc} -{function_catalog} - -When you call a function above, you should import the function from `{module_name}` first, e.g.: -```python -from metagpt.tools.functions.libs.data_preprocess import fill_missing_value -```end - -## Your Output Format: -Generate the complete code for this task: -```python -# Tools used: [function names or 'none'] - -```end - -## Attention: -Make sure use the columns from the dataset columns: {column_names} -Finish your coding tasks as a helpful programmer based on the tools. - -""" +PRINT_DATA_COLUMNS = { + "name": "print_column_info", + "description": "Print the latest column information after 'Done Tasks' code if first read or data changed.", + "parameters": { + "type": "object", + "properties": { + "is_update": { + "type": "boolean", + "description": "Whether need to update the column info.", + }, + "code": { + "type": "string", + "description": "The code to be added to a new cell in jupyter.", + }, + }, + "required": ["is_update", "code"], + }, +} GENERATE_CODE_PROMPT = """ -## Target -{goal} - -Specifically, {special_prompt} - - -## Finished Task and Code -{context} - -## Code Steps for Current Task: -Follow steps below when you writing code if it's convenient. -{code_steps} - -## Instruction -Finished task and code are executable, and you should based on the code to continue your current task -Do not repeat functions and code, try to reuse the code in [Finished Task and Code] - -## Your Output Format: -Generate the complete code for this task: -```python -import pandas as pd - -``` - -## Attention: -Make sure use the columns from the dataset columns -Finish your coding tasks as a helpful programmer based on the code. - -""" - -TOOL_USAGE_PROMPT = """ -## Target -{goal} - -## History Info -{context} - -## Available Tools: -Each function is described in JSON format, including the function name and parameters. {output_desc} -{function_catalog} - -When you call a function above, you should import the function from `{module_name}` first, e.g.: -```python -from metagpt.tools.functions.libs.data_preprocess import fill_missing_value -```end - -## Your Output Format: -Generate the complete code for this task: -```python -# Tools used: [function names or 'none'] - -```end - -## Attention: -Make sure use the columns from the dataset columns -Finish your coding tasks as a helpful programmer based on the tools. -""" - -TOOL_ORGANIZATION_PROMPT = """ -The previous conversation has provided all tasks step-by-step for the use goal and their statuses. -Now, begin writing code for the current task. This code should writen strictly on the basis of all previous completed tasks code, not a standalone code. And avoid writing duplicate code that has already been written in previous tasks, such as repeated import of packages, reading data, etc. -Specifically, {special_prompt} -You can utilize pre-defined tools in 'Available Tools' if the tools are sufficient. And you should combine the use of other public packages if necessary, like sklearn, numpy, pandas, etc.. - -## Code Steps for Current Task: -Follow steps below when you writing code if it's convenient. -{code_steps} - -## Available Tools: -Each function is described in JSON format, including the function name and parameters. {output_desc} -{function_catalog} - -When you call a function above, you should import the function from `{module_name}` first, e.g.: -```python -from metagpt.tools.functions.libs.data_preprocess import fill_missing_value -```end - -## Your Output Format: -Generate the complete code for this task: -```python -# Tools used: [function names or 'none'] - -```end - -*** Important Rules *** -- If you use tool not in the list, you should implement it by yourself. -- Ensure the output new code is executable in the same Jupyter notebook environment with previous tasks code have been executed. -- When write code for current task, remember the code should be coherent with previous tasks code. -- Remember that don't process the columns have been processed in previous tasks and don't mock data yourself. -- Prioritize using tools for the same functionality. -""" - -DATA_PREPROCESS_PROMPT = """ -The current task is about data preprocessing, closely monitor each column's data type. Apply suitable methods for various types (numerical, categorical, datetime, textual, etc.) to ensure the pandas.DataFrame is correctly formatted. -Additionally, ensure that the columns being processed must be the ones that actually exist in the dataset. -Don't write processed data to files. -""" - -FEATURE_ENGINEERING_PROMPT = """ -The current task is about feature engineering. when performing it, please adhere to the following principles: -- Ensure that the feature you're working with is indeed present in the dataset and consider the data type (numerical, categorical, etc.) and application scenario (classification, regression tasks, etc.). -- When generate new features, you should combine real world knowledge and decide what features are useful for the task. -- Generate as diverse features as possible to improve the model's performance. -- Before generating a new feature, ensure the used features are already processed and ready to use. -""" - -DATA_PROCESS_PROMPT = """ # Background -As a data scientist, you need to help user to achieve the goal [{user_requirement}] step-by-step in an continuous Jupyter notebook. +Assist in completing [{user_requirement}] in a Jupyter notebook. -## Done Tasks +## Task Progress +### Done Tasks ```python {history_code} ```end -## Current Task +### Current Task {current_task} -# Latest Data Info -Latest data info after previous tasks: +## Latest Data Info {column_info} # Task -Write a Python function for 'Current Task'. Start by copying the input DataFrame. Avoid duplicating code from 'Done Tasks'. -Specifically, {special_prompt} +Fully implement 'Current Task', ensuring all necessary steps are covered without repeating code from 'Done Tasks'. Specifically, {special_prompt} + +# Code Steps: +Follow steps below when you writing code if it's convenient. +{code_steps} +""" + +TOOL_USAGE_PROMPT = """ +# Background +Assist in completing [{user_requirement}] in a Jupyter notebook. + +## Task Progress +### Done Tasks +```python +{history_code} +```end + +### Current Task +{current_task} + +## Latest Data Info +{column_info} + +# Task +Fully implement 'Current Task', ensuring all necessary steps are covered without repeating code from 'Done Tasks'. Specifically, {special_prompt} # Code Steps: Follow steps below when you writing code if it's convenient. {code_steps} # Capabilities -- You can utilize pre-defined tools in any code lines from 'Available Tools' in the form of python functions. +- You can utilize pre-defined tools in any code lines from 'Available Tools' in the form of Python Class. - You can freely combine the use of any other public packages, like sklearn, numpy, pandas, etc.. -- You can do anything about data preprocessing, feature engineering, model training, etc.. # Available Tools: -Each function tool is described in JSON format. {output_desc} -When you call a function below, import the function from `{module_name}` first. -{function_catalog} +Each Class tool is described in JSON format. When you call it, import the tool from `{module_name}` first. +{tool_catalog} # Output Example: -when current task is "fill missing value and handle outliers", the output code be like: +For "fill missing value and handle outliers", the output code be like when there are training data and test data: ```python -from metagpt.tools.functions.libs.data_preprocess import fill_missing_value +# Tools used: ['FillMissingValue'] +from metagpt.tools.functions.libs.data_preprocess import FillMissingValue -def function_name(df): - df_processed = df.copy() - num_cols = df_processed.select_dtypes(include='number').columns.tolist() - df_processed = fill_missing_value(df_processed, num_cols, 'mean') - - for col in num_cols: - low, high = df_processed[col].quantile([0.01, 0.99]) - df_processed[col] = df_processed[col].clip(low, high) - return df_processed +train_processed = train.copy() +test_processed = test.copy() +num_cols = train_processed.select_dtypes(include='number').columns.tolist() +fill_missing_value = FillMissingValue(features=num_cols, strategy='mean') +fill_missing_value.fit(train_processed) +train_processed = fill_missing_value.transform(train_processed) +test_processed = fill_missing_value.transform(test_processed) -df_processed = function_name(df) -print(df_processed.info()) +for col in num_cols: + low, high = train_processed[col].quantile([0.01, 0.99]) + train_processed[col] = train_processed[col].clip(low, high) + test_processed[col] = test_processed[col].clip(low, high) ```end # Constraints: -- Ensure the output new code is executable in the same Jupyter notebook with previous tasks code have been executed. - Prioritize using pre-defined tools for the same functionality. -- Return DataFrame should always be named `df_processed`, while the input DataFrame should based on the done tasks' output DataFrame. -- Limit to one print statement for the output DataFrame's info. +- Copy DataFrame before processing if needed. +- If 'Code Steps' contains step done in 'Done Tasks', such as reading data, don't repeat it. +""" + +DATA_PREPROCESS_PROMPT = """ +The current task is about data preprocessing, please note the following: +- Monitor data types per column, applying appropriate methods. +- Ensure operations are on existing dataset columns. +- Avoid writing processed data to files. +- Prefer alternatives to one-hot encoding for categorical data. +- Only encode necessary categorical columns to allow for potential feature-specific engineering tasks later. +""" + +FEATURE_ENGINEERING_PROMPT = """ +The current task is about feature engineering. when performing it, please adhere to the following principles: +- Ensure operations are on existing dataset columns and consider the data type (numerical, categorical, etc.) and application scenario (classification, regression tasks, etc.). +- Create impactful features based on real-world knowledge and column info. +- Generate as diverse features as possible to improve the model's performance. +- If potential impactful features are not included in 'Code Steps', add new steps to generate them. """ MODEL_TRAIN_PROMPT = """ @@ -316,23 +258,17 @@ The current task is about training a model, please ensure high performance: - Use the data from previous task result directly, do not mock or reload data yourself. """ -DATA_PREPROCESS_OUTPUT_DESC = "Please note that all functions output a updated pandas.DataFrame after data preprocessing." - -FEATURE_ENGINEERING_OUTPUT_DESC = "Please note that all functions output a updated pandas.DataFrame with new features added or existing features modified." - -CLASSIFICATION_MODEL_OUTPUT_DESC = "" - -REGRESSION_MODEL_OUTPUT_DESC = "" +MODEL_EVALUATE_PROMPT = """ +The current task is about evaluating a model, please note the following: +- Ensure that the evaluated data is same processed as the training data. +- Use trained model from previous task result directly, do not mock or reload model yourself. +""" ML_SPECIFIC_PROMPT = { "data_preprocess": DATA_PREPROCESS_PROMPT, "feature_engineering": FEATURE_ENGINEERING_PROMPT, "model_train": MODEL_TRAIN_PROMPT, -} - -TOOL_OUTPUT_DESC = { - "data_preprocess": DATA_PREPROCESS_OUTPUT_DESC, - "feature_engineering": FEATURE_ENGINEERING_OUTPUT_DESC, + "model_evaluate": MODEL_EVALUATE_PROMPT, } ML_MODULE_MAP = { From 537d51c26e29a1774269825e3611667b2436e80d Mon Sep 17 00:00:00 2001 From: lidanyang Date: Wed, 13 Dec 2023 14:32:25 +0800 Subject: [PATCH 5/5] write code with class tool --- metagpt/actions/write_analysis_code.py | 130 +++++++++---------- metagpt/roles/ml_engineer.py | 170 ++++++++++--------------- 2 files changed, 131 insertions(+), 169 deletions(-) diff --git a/metagpt/actions/write_analysis_code.py b/metagpt/actions/write_analysis_code.py index 58cab9c6a..aceebbfeb 100644 --- a/metagpt/actions/write_analysis_code.py +++ b/metagpt/actions/write_analysis_code.py @@ -4,7 +4,9 @@ @Author : orange-crow @File : write_code_v2.py """ -from typing import Dict, List, Union, Tuple, Optional, Any +from typing import Dict, List, Union, Tuple + +import yaml from metagpt.actions import Action from metagpt.logs import logger @@ -15,11 +17,9 @@ from metagpt.prompts.ml_engineer import ( TOOL_USAGE_PROMPT, ML_SPECIFIC_PROMPT, ML_MODULE_MAP, - TOOL_OUTPUT_DESC, DATA_PROCESS_PROMPT, - GENERATE_CODE_PROMPT + GENERATE_CODE_PROMPT, ) from metagpt.schema import Message, Plan -from metagpt.tools.functions import registry from metagpt.utils.common import create_func_config, remove_comments @@ -100,40 +100,55 @@ class WriteCodeByGenerate(BaseWriteAnalysisCode): class WriteCodeWithTools(BaseWriteAnalysisCode): """Write code with help of local available tools. Choose tools first, then generate code to use the tools""" - @staticmethod - def _parse_recommend_tools(module: str, recommend_tools: list) -> List[Dict]: + def __init__(self, name: str = "", context=None, llm=None, schema_path=None): + super().__init__(name, context, llm) + self.schema_path = schema_path + self.available_tools = {} + + if self.schema_path is not None: + self._load_tools(schema_path) + + def _load_tools(self, schema_path): + """Load tools from yaml file""" + yml_files = schema_path.glob("*.yml") + for yml_file in yml_files: + module = yml_file.stem + with open(yml_file, "r", encoding="utf-8") as f: + self.available_tools[module] = yaml.safe_load(f) + + def _parse_recommend_tools(self, module: str, recommend_tools: list) -> dict: """ Parses and validates a list of recommended tools, and retrieves their schema from registry. Args: module (str): The module name for querying tools in the registry. - recommend_tools (list): A list of lists of recommended tools for each step. + recommend_tools (list): A list of recommended tools. Returns: - List[Dict]: A list of dicts of valid tool schemas. + dict: A dict of valid tool schemas. """ valid_tools = [] - available_tools = registry.get_all_by_module(module).keys() + available_tools = self.available_tools[module].keys() for tool in recommend_tools: if tool in available_tools: valid_tools.append(tool) - tool_catalog = registry.get_schemas(module, valid_tools) + tool_catalog = {tool: self.available_tools[module][tool] for tool in valid_tools} return tool_catalog async def _tool_recommendation( - self, - task: str, - code_steps: str, - available_tools: list + self, + task: str, + code_steps: str, + available_tools: dict, ) -> list: """ Recommend tools for the specified task. Args: - context (List[Message]): Action output history, source action denoted by Message.cause_by + task (str): the task to recommend tools for code_steps (str): the code steps to generate the full code for the task - available_tools (list): the available tools for the task + available_tools (dict): the available tools description Returns: list: recommended tools for the specified task @@ -149,27 +164,23 @@ class WriteCodeWithTools(BaseWriteAnalysisCode): return recommend_tools async def run( - self, - context: List[Message], - plan: Plan = None, - code_steps: str = "", - column_info: str = "", - **kwargs, - ) -> str: + self, + context: List[Message], + plan: Plan = None, + code_steps: str = "", + column_info: str = "", + **kwargs, + ) -> Tuple[List[Message], str]: task_type = plan.current_task.task_type - available_tools = registry.get_all_schema_by_module(task_type) + available_tools = self.available_tools.get(task_type, {}) special_prompt = ML_SPECIFIC_PROMPT.get(task_type, "") - column_names = kwargs.get("column_names", {}) finished_tasks = plan.get_finished_tasks() code_context = [remove_comments(task.code) for task in finished_tasks] code_context = "\n\n".join(code_context) if len(available_tools) > 0: - available_tools = [ - {k: tool[k] for k in ["name", "description"] if k in tool} - for tool in available_tools - ] + available_tools = {k: v["description"] for k, v in available_tools.items()} recommend_tools = await self._tool_recommendation( plan.current_task.instruction, @@ -180,46 +191,27 @@ class WriteCodeWithTools(BaseWriteAnalysisCode): logger.info(f"Recommended tools: \n{recommend_tools}") module_name = ML_MODULE_MAP[task_type] - output_desc = TOOL_OUTPUT_DESC.get(task_type, "") - new_code = "" - - for idx, tool in enumerate(recommend_tools): - hist_info = f"Previous finished code is \n\n ```Python {code_context} ``` \n\n " - - prompt = TOOL_USAGE_PROMPT.format( - goal=plan.current_task.instruction, - context=hist_info, - code_steps=code_steps, - column_names=column_names, - special_prompt=special_prompt, - module_name=module_name, - output_desc=output_desc, - function_catalog=tool_catalog[idx], - ) - - tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS) - - rsp = await self.llm.aask_code(prompt, **tool_config) - logger.info(f"rsp is: {rsp}") - # final_code = final_code + "\n\n" + rsp["code"] - # final_code[key] = rsp["code"] - new_code = new_code + "\n\n" + rsp["code"] - code_context = code_context + "\n\n" + rsp["code"] - return new_code - - else: - hist_info = f"Previous finished code is \n\n ```Python {code_context} ``` \n\n " - - prompt = GENERATE_CODE_PROMPT.format( - goal=plan.current_task.instruction, - context=hist_info, - code_steps=code_steps, + prompt = TOOL_USAGE_PROMPT.format( + user_requirement=plan.goal, + history_code=code_context, + current_task=plan.current_task.instruction, + column_info=column_info, special_prompt=special_prompt, - # column_names=column_names + code_steps=code_steps, + module_name=module_name, + tool_catalog=tool_catalog, + ) + else: + prompt = GENERATE_CODE_PROMPT.format( + user_requirement=plan.goal, + history_code=code_context, + current_task=plan.current_task.instruction, + column_info=column_info, + special_prompt=special_prompt, + code_steps=code_steps, ) - tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS) - logger.info(f"prompt is: {prompt}") - rsp = await self.llm.aask_code(prompt, **tool_config) - logger.info(f"rsp is: {rsp}") - return rsp["code"] + tool_config = create_func_config(CODE_GENERATOR_WITH_TOOLS) + rsp = await self.llm.aask_code(prompt, **tool_config) + context = [Message(content=prompt, role="user")] + return context, rsp["code"] diff --git a/metagpt/roles/ml_engineer.py b/metagpt/roles/ml_engineer.py index 45fe728dd..20589079d 100644 --- a/metagpt/roles/ml_engineer.py +++ b/metagpt/roles/ml_engineer.py @@ -1,5 +1,6 @@ import json import re +from datetime import datetime from typing import List import fire @@ -10,12 +11,16 @@ from metagpt.actions.execute_code import ExecutePyCode from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools from metagpt.actions.write_code_steps import WriteCodeSteps from metagpt.actions.write_plan import WritePlan -from metagpt.const import DATA_PATH +from metagpt.const import DATA_PATH, PROJECT_ROOT from metagpt.logs import logger -from metagpt.prompts.ml_engineer import GEN_DATA_DESC_PROMPT +from metagpt.prompts.ml_engineer import ( + GEN_DATA_DESC_PROMPT, + UPDATE_DATA_COLUMNS, + PRINT_DATA_COLUMNS +) from metagpt.roles import Role from metagpt.schema import Message, Plan -from metagpt.utils.common import CodeParser +from metagpt.utils.common import CodeParser, remove_comments, create_func_config from metagpt.actions.debug_code import DebugCode STRUCTURAL_CONTEXT = """ @@ -57,34 +62,6 @@ def remove_escape_and_color_codes(input_str): return result -def read_data(file: str) -> pd.DataFrame: - if file.endswith(".csv"): - df = pd.read_csv(file, sep=",") - sep_list = [";", "\t", ":", " ", "|"] - for sep in sep_list: - if df.shape[1] == 1: - df = pd.read_csv(file, sep=sep) - else: - break - else: - raise ValueError(f"Unsupported file type: {file}") - return df - - -def get_column_info(df: pd.DataFrame) -> str: - data = [] - for i in df.columns: - nan_freq = float("%.2g" % (df[i].isna().mean() * 100)) - n_unique = df[i].nunique() - data.append([i, df[i].dtype, nan_freq, n_unique]) - - samples = pd.DataFrame( - data, - columns=["Column_name", "Data_type", "NaN_Frequency(%)", "N_unique"], - ) - return samples.to_string(index=False) - - class AskReview(Action): async def run(self, context: List[Message], plan: Plan = None): logger.info("Current overall plan:") @@ -108,26 +85,20 @@ class AskReview(Action): return rsp, confirmed -class GenerateDataDesc(Action): - async def run(self, file: str) -> dict: - data_desc = {} - df = read_data(file) - data_head = df.head().to_dict(orient="list") - data_head = json.dumps(data_head, indent=4, ensure_ascii=False) - prompt = GEN_DATA_DESC_PROMPT.replace("{data_head}", data_head) - rsp = await self._aask(prompt) - rsp = CodeParser.parse_code(block=None, text=rsp) - rsp = json.loads(rsp) - data_desc["path"] = file - data_desc["data_desc"] = rsp["data_desc"] - data_desc["column_desc"] = rsp["column_desc"] - data_desc["column_info"] = get_column_info(df) - return data_desc +class UpdateDataColumns(Action): + async def run(self, plan: Plan = None) -> dict: + finished_tasks = plan.get_finished_tasks() + code_context = [remove_comments(task.code) for task in finished_tasks] + code_context = "\n\n".join(code_context) + prompt = UPDATE_DATA_COLUMNS.format(history_code=code_context) + tool_config = create_func_config(PRINT_DATA_COLUMNS) + rsp = await self.llm.aask_code(prompt, **tool_config) + return rsp class MLEngineer(Role): def __init__( - self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False, data_path: str = None + self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False, ): super().__init__(name=name, profile=profile, goal=goal) self._set_react_mode(react_mode="plan_and_act") @@ -136,13 +107,9 @@ class MLEngineer(Role): self.use_code_steps = True self.execute_code = ExecutePyCode() self.auto_run = auto_run - self.data_path = data_path self.data_desc = {} async def _plan_and_act(self): - if self.data_path: - self.data_desc = await self._generate_data_desc() - # create initial plan and update until confirmation await self._update_plan() @@ -163,25 +130,27 @@ class MLEngineer(Role): task.code_steps = code_steps self.plan.finish_current_task() self.working_memory.clear() - - if "print(df_processed.info())" in code: - self.data_desc["column_info"] = result + + success, new_code = await self._update_data_columns() + if success: + task.code = task.code + "\n\n" + new_code else: # update plan according to user's feedback and to take on changed tasks await self._update_plan() - - finished_tasks = self.plan.get_finished_tasks() - if len(finished_tasks) == len(self.plan.tasks): - code_context = [task.code for task in finished_tasks] - code_context = "\n\n".join(code_context) - result, success = await self.execute_code.run(code_context) - # truncated the result - print(truncate(result)) - - async def _generate_data_desc(self): - data_desc = await GenerateDataDesc().run(self.data_path) - return data_desc - + + time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + self.execute_code.save_notebook(f"{DATA_PATH}/notebooks/ml_{time}.ipynb") + + async def _update_data_columns(self): + rsp = await UpdateDataColumns().run(self.plan) + is_update, code = rsp["is_update"], rsp["code"] + success = False + if is_update: + result, success = await self.execute_code.run(code) + if success: + self.data_desc["column_info"] = result + return success, code + async def _write_and_exec_code(self, max_retry: int = 3): code_steps = ( await WriteCodeSteps().run(self.plan) @@ -192,6 +161,7 @@ class MLEngineer(Role): counter = 0 improve_code = "" success = False + debug_context = [] finished_tasks = self.plan.get_finished_tasks() code_context = [task.code for task in finished_tasks] @@ -200,37 +170,38 @@ class MLEngineer(Role): code_result = "\n\n".join(code_result) while not success and counter < max_retry: - if counter == 0: - context = self.get_useful_memories() - else: - # context = self.get_useful_memories() - # logger.info(f"context {context}") + context = self.get_useful_memories() + + if counter > 0: improve_code = await DebugCode().run(plan=self.plan.current_task.instruction, - finished_code=code_context, - finished_code_result=code_result, + # finished_code=code_context, + # finished_code_result=code_result, code=code, - runtime_result=self.working_memory.get()) - - if not self.use_tools or self.plan.current_task.task_type == "other": + runtime_result=self.working_memory.get(), + context=debug_context) + + if improve_code != "": + code = improve_code + logger.info(f"new code \n{improve_code}") + cause_by = DebugCode + elif not self.use_tools or self.plan.current_task.task_type == "other": logger.info("Write code with pure generation") - code = await WriteCodeByGenerate().run( context=context, plan=self.plan, code_steps=code_steps, temperature=0.0 ) + debug_context = [self.get_useful_memories(task_exclude_field={'result', 'code_steps'})[0]] cause_by = WriteCodeByGenerate else: logger.info("Write code with tools") - - if improve_code != "": - code = improve_code - logger.info(f"new code {code}") - cause_by = DebugCode - else: - code = await WriteCodeWithTools().run( - context=context, plan=self.plan, code_steps=code_steps, **{"column_names": {}} - ) - - cause_by = WriteCodeWithTools + schema_path = PROJECT_ROOT / "metagpt/tools/functions/schemas" + tool_context, code = await WriteCodeWithTools(schema_path=schema_path).run( + context=context, + plan=self.plan, + code_steps=code_steps, + column_info=self.data_desc.get("column_info", ""), + ) + debug_context = tool_context + cause_by = WriteCodeWithTools self.working_memory.add( Message(content=code, role="assistant", cause_by=cause_by) @@ -238,9 +209,7 @@ class MLEngineer(Role): # debug on code, run on runcode with finished code and new_df # runcode = code_context + "\n\n" + code - runcode = code - - result, success = await self.execute_code.run(runcode) + result, success = await self.execute_code.run(code) # truncated the result print(truncate(result)) @@ -289,12 +258,12 @@ class MLEngineer(Role): self.plan.add_tasks(tasks) self.working_memory.clear() - def get_useful_memories(self) -> List[Message]: + def get_useful_memories(self, task_exclude_field: set = None) -> List[Message]: """find useful memories only to reduce context length and improve performance""" # TODO dataset description , code steps user_requirement = self.plan.goal tasks = json.dumps( - [task.dict() for task in self.plan.tasks], indent=4, ensure_ascii=False + [task.dict(exclude=task_exclude_field) for task in self.plan.tasks], indent=4, ensure_ascii=False ) current_task = self.plan.current_task.json() if self.plan.current_task else {} context = STRUCTURAL_CONTEXT.format( @@ -321,12 +290,13 @@ if __name__ == "__main__": # requirement = "Perform data analysis on the provided data. Train a model to predict the target variable Survived. Include data preprocessing, feature engineering, and modeling in your pipeline. The metric is accuracy." - data_path = f"{DATA_PATH}/titanic" - requirement = f"This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'." - - - async def main(requirement: str = requirement, auto_run: bool = True, data_path: str = ""): - role = MLEngineer(goal=requirement, auto_run=auto_run, data_path=data_path) + # data_path = f"{DATA_PATH}/titanic" + # requirement = f"This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'." + # requirement = f"Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy" + data_path = f"{DATA_PATH}/icr-identify-age-related-conditions" + requirement = f"This is a medical dataset with over fifty anonymized health characteristics linked to three age-related conditions. Your goal is to predict whether a subject has or has not been diagnosed with one of these conditions.The target column is Class. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report f1 score on the eval data. Train data path: {data_path}/split_train.csv, eval data path: {data_path}/split_eval.csv." + async def main(requirement: str = requirement, auto_run: bool = True): + role = MLEngineer(goal=requirement, auto_run=auto_run) await role.run(requirement)