diff --git a/metagpt/roles/ml_engineer.py b/metagpt/roles/ml_engineer.py index 65583638e..15edb2b06 100644 --- a/metagpt/roles/ml_engineer.py +++ b/metagpt/roles/ml_engineer.py @@ -1,25 +1,38 @@ -from typing import Dict, List, Union +import glob import json -import subprocess +from typing import List import fire +import pandas as pd import re -from metagpt.roles import Role from metagpt.actions import Action -from metagpt.schema import Message, Task, Plan -from metagpt.logs import logger -from metagpt.actions.write_plan import WritePlan -from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools from metagpt.actions.execute_code import ExecutePyCode +from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools +from metagpt.actions.write_plan import WritePlan +from metagpt.actions.write_task_guide import WriteTaskGuide +from metagpt.logs import logger +from metagpt.prompts.ml_engineer import GEN_DATA_DESC_PROMPT +from metagpt.roles import Role +from metagpt.schema import Message, Plan +from metagpt.utils.common import CodeParser STRUCTURAL_CONTEXT = """ ## User Requirement {user_requirement} +## Dataset Description +{data_desc} ## Current Plan {tasks} ## Current Task {current_task} +## Packages Installed +scikit-learn +pandas +numpy +lightgbm +xgboost +catboost """ @@ -43,6 +56,50 @@ def remove_escape_and_color_codes(input_str): return result +def read_data(file: str) -> pd.DataFrame: + if file.endswith(".csv"): + df = pd.read_csv(file, sep=",") + sep_list = [";", "\t", ":", " ", "|"] + for sep in sep_list: + if df.shape[1] == 1: + df = pd.read_csv(file, sep=sep) + else: + break + else: + raise ValueError(f"Unsupported file type: {file}") + return df + + +def get_samples(df: pd.DataFrame) -> str: + data = [] + + if len(df) > 5: + df_ = df.sample(5, random_state=0) + else: + df_ = df + + for i in list(df_): + nan_freq = float("%.2g" % (df[i].isna().mean() * 100)) + n_unique = df[i].nunique() + s = df_[i].tolist() + + if str(df[i].dtype) == "float64": + s = [round(sample, 2) if not pd.isna(sample) else None for sample in s] + + data.append([df_[i].name, df[i].dtype, nan_freq, n_unique, s]) + samples = pd.DataFrame( + data, + columns=[ + "Column_name", + "Data_type", + "NaN_Frequency(%)", + "N_unique", + "Samples", + ], + ) + return samples.to_string(index=False) + + class AskReview(Action): async def run(self, context: List[Message], plan: Plan = None): logger.info("Current overall plan:") @@ -66,24 +123,47 @@ class AskReview(Action): return rsp, confirmed -class WriteTaskGuide(Action): - async def run(self, task_instruction: str, data_desc: str = "") -> str: - return "" +# class WriteTaskGuide(Action): +# async def run(self, task_instruction: str, data_desc: dict = None) -> str: +# return "" + + +class GenerateDataDesc(Action): + async def run(self, files: list) -> dict: + data_desc = {} + for file in files: + df = read_data(file) + file_name = file.split("/")[-1] + data_head = df.head().to_dict(orient="list") + data_head = json.dumps(data_head, indent=4, ensure_ascii=False) + prompt = GEN_DATA_DESC_PROMPT.replace("{data_head}", data_head) + rsp = await self._aask(prompt) + rsp = CodeParser.parse_code(block=None, text=rsp) + data_desc[file_name] = {} + data_desc[file_name]["path"] = file + data_desc[file_name]["description"] = rsp + data_desc[file_name]["column_info"] = get_samples(df) + return data_desc class MLEngineer(Role): def __init__( - self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False + self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False, data_path: str = None ): super().__init__(name=name, profile=profile, goal=goal) self._set_react_mode(react_mode="plan_and_act") self.plan = Plan(goal=goal) - self.use_tools = False - self.use_task_guide = False + self.use_tools = True + self.use_task_guide = True self.execute_code = ExecutePyCode() self.auto_run = auto_run + self.data_path = data_path + self.data_desc = {} async def _plan_and_act(self): + if self.data_path: + self.data_desc = await self._generate_data_desc() + # create initial plan and update until confirmation await self._update_plan() @@ -108,9 +188,14 @@ class MLEngineer(Role): # update plan according to user's feedback and to take on changed tasks await self._update_plan() + async def _generate_data_desc(self): + files = glob.glob(self.data_path + "/*.csv") + data_desc = await GenerateDataDesc().run(files=files) + return data_desc + async def _write_and_exec_code(self, max_retry: int = 3): task_guide = ( - await WriteTaskGuide().run(self.plan.current_task.instruction) + await WriteTaskGuide().run(self.plan) if self.use_task_guide else "" ) @@ -126,14 +211,16 @@ class MLEngineer(Role): # breakpoint() if not self.use_tools or self.plan.current_task.task_type == "other": + logger.info("Write code with pure generation") # code = "print('abc')" code = await WriteCodeByGenerate().run( context=context, plan=self.plan, task_guide=task_guide, temperature=0.0 ) cause_by = WriteCodeByGenerate else: + logger.info("Write code with tools") code = await WriteCodeWithTools().run( - context=context, plan=self.plan, task_guide=task_guide, data_desc="" + context=context, plan=self.plan, task_guide=task_guide ) cause_by = WriteCodeWithTools @@ -192,7 +279,10 @@ class MLEngineer(Role): ) current_task = self.plan.current_task.json() if self.plan.current_task else {} context = STRUCTURAL_CONTEXT.format( - user_requirement=user_requirement, tasks=tasks, current_task=current_task + user_requirement=user_requirement, + data_desc=self.data_desc, + tasks=tasks, + current_task=current_task ) context_msg = [Message(content=context, role="user")] @@ -204,14 +294,17 @@ class MLEngineer(Role): if __name__ == "__main__": - requirement = "Run data analysis on sklearn Iris dataset, include a plot" + # requirement = "Run data analysis on sklearn Iris dataset, include a plot.." # requirement = "Run data analysis on sklearn Diabetes dataset, include a plot" # requirement = "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy" # requirement = "Run data analysis on sklearn Wisconsin Breast Cancer dataset, include a plot, train a model to predict targets (20% as validation), and show validation accuracy" # requirement = "Run EDA and visualization on this dataset, train a model to predict survival, report metrics on validation set (20%), dataset: workspace/titanic/train.csv" - async def main(requirement: str = requirement, auto_run: bool = False): - role = MLEngineer(goal=requirement, auto_run=auto_run) + requirement = "Perform data analysis on the provided data. Train a model to predict the target variable Survived. Include data preprocessing, feature engineering, and modeling in your pipeline. The metric is accuracy." + data_path = "/data/lidanyang/tabular_data/titanic" + + async def main(requirement: str = requirement, auto_run: bool = True, data_path: str = data_path): + role = MLEngineer(goal=requirement, auto_run=auto_run, data_path=data_path) await role.run(requirement) fire.Fire(main)