add GenerateDataDesc action

This commit is contained in:
lidanyang 2023-12-06 14:16:48 +08:00
parent 20a918bf39
commit 962632cd15

View file

@ -1,25 +1,38 @@
from typing import Dict, List, Union
import glob
import json
import subprocess
from typing import List
import fire
import pandas as pd
import re
from metagpt.roles import Role
from metagpt.actions import Action
from metagpt.schema import Message, Task, Plan
from metagpt.logs import logger
from metagpt.actions.write_plan import WritePlan
from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.actions.write_analysis_code import WriteCodeByGenerate, WriteCodeWithTools
from metagpt.actions.write_plan import WritePlan
from metagpt.actions.write_task_guide import WriteTaskGuide
from metagpt.logs import logger
from metagpt.prompts.ml_engineer import GEN_DATA_DESC_PROMPT
from metagpt.roles import Role
from metagpt.schema import Message, Plan
from metagpt.utils.common import CodeParser
STRUCTURAL_CONTEXT = """
## User Requirement
{user_requirement}
## Dataset Description
{data_desc}
## Current Plan
{tasks}
## Current Task
{current_task}
## Packages Installed
scikit-learn
pandas
numpy
lightgbm
xgboost
catboost
"""
@ -43,6 +56,50 @@ def remove_escape_and_color_codes(input_str):
return result
def read_data(file: str) -> pd.DataFrame:
if file.endswith(".csv"):
df = pd.read_csv(file, sep=",")
sep_list = [";", "\t", ":", " ", "|"]
for sep in sep_list:
if df.shape[1] == 1:
df = pd.read_csv(file, sep=sep)
else:
break
else:
raise ValueError(f"Unsupported file type: {file}")
return df
def get_samples(df: pd.DataFrame) -> str:
data = []
if len(df) > 5:
df_ = df.sample(5, random_state=0)
else:
df_ = df
for i in list(df_):
nan_freq = float("%.2g" % (df[i].isna().mean() * 100))
n_unique = df[i].nunique()
s = df_[i].tolist()
if str(df[i].dtype) == "float64":
s = [round(sample, 2) if not pd.isna(sample) else None for sample in s]
data.append([df_[i].name, df[i].dtype, nan_freq, n_unique, s])
samples = pd.DataFrame(
data,
columns=[
"Column_name",
"Data_type",
"NaN_Frequency(%)",
"N_unique",
"Samples",
],
)
return samples.to_string(index=False)
class AskReview(Action):
async def run(self, context: List[Message], plan: Plan = None):
logger.info("Current overall plan:")
@ -66,24 +123,47 @@ class AskReview(Action):
return rsp, confirmed
class WriteTaskGuide(Action):
async def run(self, task_instruction: str, data_desc: str = "") -> str:
return ""
# class WriteTaskGuide(Action):
# async def run(self, task_instruction: str, data_desc: dict = None) -> str:
# return ""
class GenerateDataDesc(Action):
async def run(self, files: list) -> dict:
data_desc = {}
for file in files:
df = read_data(file)
file_name = file.split("/")[-1]
data_head = df.head().to_dict(orient="list")
data_head = json.dumps(data_head, indent=4, ensure_ascii=False)
prompt = GEN_DATA_DESC_PROMPT.replace("{data_head}", data_head)
rsp = await self._aask(prompt)
rsp = CodeParser.parse_code(block=None, text=rsp)
data_desc[file_name] = {}
data_desc[file_name]["path"] = file
data_desc[file_name]["description"] = rsp
data_desc[file_name]["column_info"] = get_samples(df)
return data_desc
class MLEngineer(Role):
def __init__(
self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False
self, name="ABC", profile="MLEngineer", goal="", auto_run: bool = False, data_path: str = None
):
super().__init__(name=name, profile=profile, goal=goal)
self._set_react_mode(react_mode="plan_and_act")
self.plan = Plan(goal=goal)
self.use_tools = False
self.use_task_guide = False
self.use_tools = True
self.use_task_guide = True
self.execute_code = ExecutePyCode()
self.auto_run = auto_run
self.data_path = data_path
self.data_desc = {}
async def _plan_and_act(self):
if self.data_path:
self.data_desc = await self._generate_data_desc()
# create initial plan and update until confirmation
await self._update_plan()
@ -108,9 +188,14 @@ class MLEngineer(Role):
# update plan according to user's feedback and to take on changed tasks
await self._update_plan()
async def _generate_data_desc(self):
files = glob.glob(self.data_path + "/*.csv")
data_desc = await GenerateDataDesc().run(files=files)
return data_desc
async def _write_and_exec_code(self, max_retry: int = 3):
task_guide = (
await WriteTaskGuide().run(self.plan.current_task.instruction)
await WriteTaskGuide().run(self.plan)
if self.use_task_guide
else ""
)
@ -126,14 +211,16 @@ class MLEngineer(Role):
# breakpoint()
if not self.use_tools or self.plan.current_task.task_type == "other":
logger.info("Write code with pure generation")
# code = "print('abc')"
code = await WriteCodeByGenerate().run(
context=context, plan=self.plan, task_guide=task_guide, temperature=0.0
)
cause_by = WriteCodeByGenerate
else:
logger.info("Write code with tools")
code = await WriteCodeWithTools().run(
context=context, plan=self.plan, task_guide=task_guide, data_desc=""
context=context, plan=self.plan, task_guide=task_guide
)
cause_by = WriteCodeWithTools
@ -192,7 +279,10 @@ class MLEngineer(Role):
)
current_task = self.plan.current_task.json() if self.plan.current_task else {}
context = STRUCTURAL_CONTEXT.format(
user_requirement=user_requirement, tasks=tasks, current_task=current_task
user_requirement=user_requirement,
data_desc=self.data_desc,
tasks=tasks,
current_task=current_task
)
context_msg = [Message(content=context, role="user")]
@ -204,14 +294,17 @@ class MLEngineer(Role):
if __name__ == "__main__":
requirement = "Run data analysis on sklearn Iris dataset, include a plot"
# requirement = "Run data analysis on sklearn Iris dataset, include a plot.."
# requirement = "Run data analysis on sklearn Diabetes dataset, include a plot"
# requirement = "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy"
# requirement = "Run data analysis on sklearn Wisconsin Breast Cancer dataset, include a plot, train a model to predict targets (20% as validation), and show validation accuracy"
# requirement = "Run EDA and visualization on this dataset, train a model to predict survival, report metrics on validation set (20%), dataset: workspace/titanic/train.csv"
async def main(requirement: str = requirement, auto_run: bool = False):
role = MLEngineer(goal=requirement, auto_run=auto_run)
requirement = "Perform data analysis on the provided data. Train a model to predict the target variable Survived. Include data preprocessing, feature engineering, and modeling in your pipeline. The metric is accuracy."
data_path = "/data/lidanyang/tabular_data/titanic"
async def main(requirement: str = requirement, auto_run: bool = True, data_path: str = data_path):
role = MLEngineer(goal=requirement, auto_run=auto_run, data_path=data_path)
await role.run(requirement)
fire.Fire(main)