diff --git a/expo/MCTS.py b/expo/MCTS.py new file mode 100644 index 000000000..9026e09b4 --- /dev/null +++ b/expo/MCTS.py @@ -0,0 +1,347 @@ +import random +import math +import os +import pandas as pd +from expo.research_assistant import ResearchAssistant +from expo.insights.InsightGenerate import InsightGenerator +from expo.dataset import get_split_dataset_path +from expo.evaluation.evaluation import evaluate_score +from expo.utils import mcts_logger, load_execute_notebook, generate_task_requirement, get_exp_pool_path + +from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender +from metagpt.utils.common import write_json_file, read_json_file, format_trackback_info +import numpy as np +import pickle + +def initialize_di_root_node(task, data_config, low_is_better=False, reflection=True, name=""): + start_task_id = 2 + state = create_initial_state(task, start_task_id=start_task_id, data_config=data_config, low_is_better=low_is_better, name=name) + role = ResearchAssistant(node_id="0", start_task_id=start_task_id, use_reflection=reflection, role_dir=state["node_dir"]) + return role, Node(parent=None, state=state, action=None, value=0) + + +def create_initial_state(task, start_task_id, data_config, low_is_better, name): + initial_state = { + "task": task, + "work_dir": data_config["work_dir"], + "node_dir": os.path.join(data_config["work_dir"], data_config["role_dir"], f"{task}{name}"), + "dataset_config": data_config["datasets"][task], + "datasets_dir": get_split_dataset_path(task, data_config), + "exp_pool_path": get_exp_pool_path(task, data_config, pool_name="ds_analysis_pool"), + "requirement": generate_task_requirement(task, data_config), + "has_run": False, + "start_task_id": start_task_id, + "low_is_better": low_is_better, + } + return initial_state + + +class Node(): + state : dict = {} + action : str = None + value : float = 0 + visited : int = 0 + children : list = [] + parent = None + + def __init__(self, parent=None, state = None, action=None, value = 0, max_depth=4, **kwargs): + self.state = state + self.action = action + self.value = value + self.raw_value = 0 + self.raw_reward = dict() + self.parent = parent + self.children = [] + self.max_depth = max_depth + self.depth = self.generate_depth() + self.id = self.generate_id() + if self.parent is not None: + self.save_node() + + def avg_value(self): + if self.visited == 0: + return 0 + return self.value / self.visited + + def __hash__(self): + return hash(self.id) + + def save_node(self): + os.makedirs(self.state["node_dir"], exist_ok=True) + with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), 'wb') as f: + pickle.dump(self, f) + + def load_node(self): + with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), 'rb') as f: + return pickle.load(f) + + def get_depth(self): + return self.depth + + def generate_depth(self): + if self.parent is None: + return 0 + else: + return self.parent.depth + 1 + + def generate_id(self): + if self.parent is None: + return "0" + else: + num_sibling = len(self.parent.children) + return f"{self.parent.id}-{num_sibling}" + + def is_terminal(self): + return int(self.state["start_task_id"]) == self.max_depth + 1 + + def is_fully_expanded(self): + return len(self.children) > 0 + + def add_child(self, child_node): + self.children.append(child_node) + + def update(self, reward:dict, child_node=None): + if child_node is not None: + child_role = child_node.load_role() + role = self.load_role() + role.update_til_start_task(child_role) + role.save_state() + else: + self.raw_value = reward["test_score"] + self.value += reward["score"] + self.visited += 1 + self.save_node() + + def get_role_path(self): + fname = f"Node-{self.id}.json" + role_path = os.path.join(self.state["node_dir"], fname) + return role_path + + def load_role(self): + role_dict = read_json_file(self.get_role_path()) + if role_dict.get('tool_recommender') is None: + role_dict['tool_recommender'] = ToolRecommender() + elif isinstance(role_dict.get('tool_recommender', {}).get('tools'), dict): + role_dict['tool_recommender']['tools'] = list(role_dict['tool_recommender']['tools'].keys()) + role = ResearchAssistant(**role_dict) + if self.parent is not None: # TODO: Check this + parent_role = self.parent.load_role() + role.update_til_start_task(parent_role, backward=False) + role.remap_tasks() + return role + + def save_new_role(self, role: ResearchAssistant): + role.node_id = self.id + role.start_task_id = self.state['start_task_id'] + role.state_saved = False + role.change_next_instruction(self.action) + mcts_logger.log("MCTS", f"保存新的role: {role.node_id}") + role.save_state(static_save=True) + + async def expand(self, max_children): + if self.is_fully_expanded(): + return + insight_geneartor = InsightGenerator() + role = self.load_role() + original_instruction = role.get_next_instruction() + insights = await insight_geneartor.generate_new_instructions(task_id=role.start_task_id + 1, + original_instruction=original_instruction, + max_num=max_children, + file_path=self.state["exp_pool_path"]) + new_state = self.state.copy() + new_state['start_task_id'] += 1 + for insight in insights: + new_role = role.model_copy() + node = Node(parent=self, state=new_state, action=insight, value=0) + node.save_new_role(new_role) + self.add_child(node) + + # def evaluate_test(self): + # prediction_fpath = os.path.join(self.state["work_dir"], self.state["task"], "predictions.csv") + # predictions = pd.read_csv(prediction_fpath)["target"] + # # copy predictions.csv to the node_dir + # predictions_node_fpath = os.path.join(self.state["node_dir"], "Node-{self.id}-predictions.csv") + # predictions.to_csv(predictions_node_fpath, index=False) + # # load test_target.csv + # split_datasets_dir = self.state["datasets_dir"] + # gt = pd.read_csv(os.path.join(split_datasets_dir["test_target"]))["target"] + # metric = self.state["dataset_config"]["metric"] + # return evaluate_score(predictions, gt, metric) + + def evaluate_prediction(self, split): + pred_path = os.path.join(self.state["work_dir"], self.state["task"], f"{split}-predictions.csv") + pred_node_path = os.path.join(self.state["node_dir"], f"Node-{self.id}-{split}-predictions.csv") + gt_path = os.path.join(self.state["datasets_dir"][f"{split}_target"]) + preds = pd.read_csv(pred_path)["target"] + preds.to_csv(pred_node_path, index=False) + gt = pd.read_csv(gt_path)["target"] + metric = self.state["dataset_config"]["metric"] + return evaluate_score(preds, gt, metric) + + def evaluate_simulation(self, score_dict): + scores = { + "dev_score": self.evaluate_prediction("dev"), + "test_score": self.evaluate_prediction("test") + } + score_dict.update(scores) + return score_dict + + + async def run_node(self, role=None): + if self.is_terminal() and role is not None: + if role.state_saved: + return self.raw_reward + + if not role: + role = self.load_role() + await load_execute_notebook(role) # execute previous notebook's code + await role.run(with_message='continue') + else: + await role.run(with_message=self.state['requirement']) + score_dict = await role.get_score() + score_dict = self.evaluate_simulation(score_dict) + self.raw_reward = score_dict + + if self.state["low_is_better"]: + # normalized the score to be between 0 and 1, and higher is better + def normalize_score(score): + return 1 / (1 + score) + score_dict = {k: normalize_score(v) for k, v in score_dict.items()} + return score_dict + + +class MCTS(): + #data_path + root_node : Node = None + children : dict = {} + max_depth : int = 5 + c_explore : float = 1.4 + c_unvisited : float = 0.8 + + def __init__(self, root_node, max_depth): + self.root_node = root_node + self.max_depth = max_depth + + def select(self, node: Node): + node = self.best_child() + mcts_logger.log("MCTS", f"选择的叶子节点id: {node.id}") + return node + + def best_child(self): + def uct(node: Node): + n_visits = node.visited if node.visited else self.c_unvisited + avg_value = node.avg_value() if node.visited else node.value/self.c_unvisited + return avg_value + self.c_explore * math.sqrt(math.log(node.parent.visited) / n_visits) + if len(self.children) == 0: + return self.root_node + all_children = [child for children in self.children.values() for child in children] + return max(all_children, key=uct) + + async def expand(self, node : Node, max_children=4): + await node.expand(max_children) + if node not in self.children or not self.children[node]: + self.children[node] = node.children + return node.children + + async def simulate(self, node : Node, role=None): + "Returns the reward for a random simulation (to completion) of `node`" + while node.children: + node = random.choice(node.children) + reward = await node.run_node(role) + return reward + + + def backpropagate(self, node : Node, reward): + child_node = node + node.update(reward) + node = node.parent + while node is not None: + node.update(reward, child_node) + node, child_node = node.parent, node + + def best_path(self, root : Node): + best_child = root + best_score = 0 + def bfs(node : Node, best_score, best_child : Node): + if node not in self.children: + return best_score, best_child + for child in self.children[node]: + print(child.id, child.raw_value) + if child.raw_value > best_score: + best_score = child.raw_value + best_child = child + best_score, best_child = bfs(child, best_score, best_child) + return best_score, best_child + best_score, best_child = bfs(root, best_score, best_child) + mcts_logger.log("MCTS", f"Best Score: {best_score}, Best Node ID: {best_child.id}") + return best_child + + def get_num_simulations(self): + return self.root_node.visited + + async def search(self, task, data_config, name, + rollout=3, load_tree=False, low_is_better=False, reflection=False): + + role, root = initialize_di_root_node(task, data_config, low_is_better=low_is_better, reflection=reflection, name=name) + self.root_node = root + tree_loaded = False + if load_tree: + tree_loaded = self.load_tree() + mcts_logger.log("MCTS", f"Number of simulations: {self.get_num_simulations()}") + if not tree_loaded: + self.children[root] = [] + reward = await self.simulate(root, role) + self.backpropagate(root, reward) + mcts_logger.log("MCTS", f"Root node's value: {reward}") + children = await self.expand(root) + #目前是随机选择1个,后续可以改成多个 + first_leaf = random.choice(children) + mcts_logger.log("MCTS", f"随机选择的叶子节点id: {first_leaf.id}") + reward = await self.simulate(first_leaf) + mcts_logger.log("MCTS", f"模拟完毕的叶子节点的Normalized score: {reward}") + self.backpropagate(first_leaf, reward) + else: + root = self.root_node + # 后续迭代:使用UCT进行选择,expand并模拟和反向传播 + for _ in range(rollout): # 迭代次数 + mcts_logger.log("MCTS", f"开始第{_+1}次迭代") + leaf = self.select(root) + if leaf.is_terminal(): + if leaf.raw_value == 0: + reward = await self.simulate(leaf) + else: + reward = {"test_score": leaf.raw_value, "score": leaf.value} + mcts_logger.log("MCTS", f"终止节点的得分为: {reward}") + self.backpropagate(leaf, reward) + else: + if leaf.visited > 0: + children = await self.expand(leaf) + leaf = random.choice(children) + mcts_logger.log("MCTS", f"随机选择的叶子节点id: {leaf.id}") + reward = await self.simulate(leaf) + mcts_logger.log("MCTS", f"模拟完毕的叶子节点{leaf.id}的Normalized score: {reward}") + self.backpropagate(leaf, reward) + return self.best_path(root) + + + def load_tree(self): + def load_children_node(node): + mcts_logger.log("MCTS", f"加载节点{node.id}的子节点:{node.children}") + if node.is_terminal() or not node.children: + return + for child in node.children: + child.load_node() + self.children[child] = child.children + load_children_node(child) + # Load all pkl files in the node_dir + all_pkl_files = os.listdir(self.root_node.state["node_dir"]) + all_pkl_files = [f for f in all_pkl_files if f.endswith(".pkl")] + if os.path.exists(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl")): + with open(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl"), 'rb') as f: + self.root_node = pickle.load(f) + self.children[self.root_node] = self.root_node.children + load_children_node(self.root_node) + if self.children: + mcts_logger.log("MCTS", "成功加载树") + return True + return False \ No newline at end of file diff --git a/expo/data.yaml b/expo/data.yaml new file mode 100644 index 000000000..d921e1ebf --- /dev/null +++ b/expo/data.yaml @@ -0,0 +1,157 @@ +datasets_dir: "D:/work/automl/datasets" # path to the datasets directory + +datasets: + titanic: + dataset: "04_titanic" + user_requirement: "This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Don't plot." + metric: "accuracy" + + house_prices: + dataset: "05_house-prices-advanced-regression-techniques" + user_requirement: "This is a house price dataset, your goal is to predict the sale price of a property based on its features. Make sure to generate at least 5 tasks each time, including eda, data preprocessing, feature engineering, model training to predict the target, and model evaluation. Report RMSE between the logarithm of the predicted value and the logarithm of the observed sale prices on the eval data. The target column is 'SalePrice'. Please do not include any processing of the target column in the data preprocessing and feature engineering stages. Don't plot." + metric: "log rmse" + + santander_customers: + dataset: "06_santander-customer-transaction-prediction" + user_requirement: "This is a customers financial dataset. Your goal is to predict which customers will make a specific transaction in the future. The target column is target. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report AUC on the eval data. Don't plot." + metric: "auc" + + icr: + dataset: "07_icr-identify-age-related-conditions" + user_requirement: "ICR dataset is a medical dataset with over fifty anonymized health characteristics linked to three age-related conditions. Your goal is to predict whether a subject has or has not been diagnosed with one of these conditions. Make sure to generate at least 5 tasks each time, including eda, data preprocessing, feature engineering, model training to predict the target, and model evaluation. The target column is Class. Report F1 Score on the eval data. Don't plot." + metric: "f1" + + santander_value: + dataset: "08_santander-value-prediction-challenge" + user_requirement: "This is a regression problem. Your goal is to predict the value of transactions for potential customers. The target column is target. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report RMSE on the eval data. Don't plot." + metric: "rmse" + + load_wine: + dataset: None + user_requirement: "Analyze the 'load_wine' dataset from sklearn to predict wine quality. Visualize relationships between features, use machine learning for classification, and report model accuracy. Include analysis and prediction visualizations. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Don't plot!" + metric: "accuracy" + + lick_prediction_small: + dataset: Click_prediction_small + metric: f1 + user_requirement: "This is a Click_prediction_small dataset. Your goal is to predict\ + \ the target column `click`.\nPerform data analysis, data preprocessing, feature\ + \ engineering, and modeling to predict the target. \nReport f1 on the eval data.\ + \ Do not plot or make any visualizations.\n" + + GesturePhaseSegmentationProcessed: + dataset: GesturePhaseSegmentationProcessed + metric: f1 weighted + user_requirement: "This is a GesturePhaseSegmentationProcessed dataset. Your goal\ + \ is to predict the target column `Phase`.\nPerform data analysis, data preprocessing,\ + \ feature engineering, and modeling to predict the target. \nReport f1 weighted\ + \ on the eval data. Do not plot or make any visualizations.\n" + + Moneyball: + dataset: Moneyball + metric: rmse + user_requirement: "This is a Moneyball dataset. Your goal is to predict the target\ + \ column `RS`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport rmse on the eval data. Do not\ + \ plot or make any visualizations.\n" + + SAT11-HAND-runtime-regression: + dataset: SAT11-HAND-runtime-regression + metric: rmse + user_requirement: "This is a SAT11-HAND-runtime-regression dataset. Your goal\ + \ is to predict the target column `runtime`.\nPerform data analysis, data preprocessing,\ + \ feature engineering, and modeling to predict the target. \nReport rmse on\ + \ the eval data. Do not plot or make any visualizations.\n" + + boston: + dataset: boston + metric: rmse + user_requirement: "This is a boston dataset. Your goal is to predict the target\ + \ column `MEDV`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport rmse on the eval data. Do not\ + \ plot or make any visualizations.\n" + + colleges: + dataset: colleges + metric: rmse + user_requirement: "This is a colleges dataset. Your goal is to predict the target\ + \ column `percent_pell_grant`.\nPerform data analysis, data preprocessing, feature\ + \ engineering, and modeling to predict the target. \nReport rmse on the eval\ + \ data. Do not plot or make any visualizations.\n" + + credit-g: + dataset: credit-g + metric: f1 + user_requirement: "This is a credit-g dataset. Your goal is to predict the target\ + \ column `class`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\ + \ or make any visualizations.\n" + + diamonds: + dataset: diamonds + metric: rmse + user_requirement: "This is a diamonds dataset. Your goal is to predict the target\ + \ column `price`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport rmse on the eval data. Do not\ + \ plot or make any visualizations.\n" + + jasmine: + dataset: jasmine + metric: f1 + user_requirement: "This is a jasmine dataset. Your goal is to predict the target\ + \ column `class`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\ + \ or make any visualizations.\n" + + kc1: + dataset: kc1 + metric: f1 + user_requirement: "This is a kc1 dataset. Your goal is to predict the target column\ + \ `defects`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\ + \ or make any visualizations.\n" + + kick: + dataset: kick + metric: f1 + user_requirement: "This is a kick dataset. Your goal is to predict the target\ + \ column `IsBadBuy`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\ + \ or make any visualizations.\n" + + mfeat-factors: + dataset: mfeat-factors + metric: f1 weighted + user_requirement: "This is a mfeat-factors dataset. Your goal is to predict the\ + \ target column `class`.\nPerform data analysis, data preprocessing, feature\ + \ engineering, and modeling to predict the target. \nReport f1 weighted on the\ + \ eval data. Do not plot or make any visualizations.\n" + + segment: + dataset: segment + metric: f1 weighted + user_requirement: "This is a segment dataset. Your goal is to predict the target\ + \ column `class`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport f1 weighted on the eval data.\ + \ Do not plot or make any visualizations.\n" + + steel-plates-fault: + dataset: steel-plates-fault + metric: f1 weighted + user_requirement: "This is a steel-plates-fault dataset. Your goal is to predict\ + \ the target column `target`.\nPerform data analysis, data preprocessing, feature\ + \ engineering, and modeling to predict the target. \nReport f1 weighted on the\ + \ eval data. Do not plot or make any visualizations.\n" + + wine-quality-white: + dataset: wine-quality-white + metric: f1 weighted + user_requirement: "This is a wine-quality-white dataset. Your goal is to predict\ + \ the target column `Class`.\nPerform data analysis, data preprocessing, feature\ + \ engineering, and modeling to predict the target. \nReport f1 weighted on the\ + \ eval data. Do not plot or make any visualizations.\n" + + +work_dir: D:/work/MG-open/MetaGPT/workspace # path to the workspace directory +role_dir: storage/team/environment/roles/ResearchAssistant_David +# analysis_pool_dir: D:/work/MG-open/MetaGPT/examples/MCTS_test/analysis_pool_sample.json \ No newline at end of file diff --git a/expo/dataset.py b/expo/dataset.py new file mode 100644 index 000000000..4bce6e9fe --- /dev/null +++ b/expo/dataset.py @@ -0,0 +1,262 @@ +import openml +from pathlib import Path +from sklearn.model_selection import train_test_split +import os +import json +import yaml +import pandas as pd +from examples.MCTS_test.insights.solution_designer import SolutionDesigner +import asyncio + +BASE_USER_REQUIREMENT = """\ +This is a {datasetname} dataset. Your goal is to predict the target column `{target_col}`. +Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. +Report {metric} on the eval data. Do not plot or make any visualizations. +""" + +SEED = 100 +TRAIN_TEST_SPLIT = 0.8 +TRAIN_DEV_SPLIT = 0.75 + +OPENML_DATASET_IDS = [ + # reg + 41021, + 42727, + 41980, + 42225, + 531, + + # cls + 41143, + 31, + 42733, + 41162, + 1067, + + # multi cls + 40498, + 40982, + 12, + 40984, + 4538, +] + +CUSTOM_DATASETS = [ + ("04_titanic", "Survived"), + ("05_house-prices-advanced-regression-techniques", "SalePrice"), + ("06_santander-customer-transaction-prediction", "target"), + ("07_icr-identify-age-related-conditions", "Class") +] + +def get_split_dataset_path(dataset_name, config): + datasets_dir = config['datasets_dir'] + if dataset_name in config['datasets']: + dataset = config['datasets'][dataset_name] + data_path = os.path.join(datasets_dir, dataset['dataset']) + split_datasets = { + "train": os.path.join(data_path, "split_train.csv"), + "dev": os.path.join(data_path, "split_dev.csv"), + "dev_wo_target": os.path.join(data_path, "split_dev_wo_target.csv"), + "dev_target": os.path.join(data_path, "split_dev_target.csv"), + "test": os.path.join(data_path, "split_test.csv"), + "test_wo_target": os.path.join(data_path, "split_test_wo_target.csv"), + "test_target": os.path.join(data_path, "split_test_target.csv"), + } + return split_datasets + else: + raise ValueError(f"Dataset {dataset_name} not found in config file. Available datasets: {config['datasets'].keys()}") + +def get_user_requirement(task_name, config): + datasets_dir = config['datasets_dir'] + if task_name in config['datasets']: + dataset = config['datasets'][task_name] + data_path = os.path.join(datasets_dir, dataset['dataset']) + user_requirement = dataset['user_requirement'] + return data_path, user_requirement + else: + raise ValueError(f"Dataset {task_name} not found in config file. Available datasets: {config['datasets'].keys()}") + + +def save_datasets_dict_to_yaml(datasets_dict): + with open("datasets.yaml", "w") as file: + yaml.dump(datasets_dict, file) + +def create_dataset_dict(dataset): + dataset_dict = { + "dataset": dataset.name, + "user_requirement": dataset.create_base_requirement(), + "metric": dataset.get_metric() + } + return dataset_dict + +class ExpDataset: + description : str = None + metadata : dict = None + dataset_dir : str = None + target_col : str = None + name : str = None + + def __init__(self, name, dataset_dir, **kwargs): + self.name = name + self.dataset_dir = dataset_dir + self.target_col = kwargs.get("target_col", None) + self.force_update = kwargs.get("force_update", False) + self.save_dataset(target_col=self.target_col) + + def check_dataset_exists(self): + fnames = ["split_train.csv", "split_dev.csv", "split_test.csv", + "split_dev_wo_target.csv", "split_dev_target.csv", + "split_test_wo_target.csv", "split_test_target.csv"] + for fname in fnames: + if not os.path.exists(Path(self.dataset_dir, self.name, fname)): + return False + return True + + def check_datasetinfo_exists(self): + return os.path.exists(Path(self.dataset_dir, self.name, "dataset_info.json")) + + + def get_raw_dataset(self): + raw_dir = Path(self.dataset_dir, self.name, "raw") + if not os.path.exists(Path(raw_dir, "train.csv")): + raise FileNotFoundError(f"Raw dataset `train.csv` not found in {raw_dir}") + else: + df = pd.read_csv(Path(raw_dir, "train.csv")) + return df + + def get_dataset_info(self): + raw_df = pd.read_csv(Path(self.dataset_dir, self.name, "raw", "train.csv")) + metadata = { + 'NumberOfClasses': raw_df[self.target_col].nunique(), + 'NumberOfFeatures': raw_df.shape[1], + 'NumberOfInstances': raw_df.shape[0], + 'NumberOfInstancesWithMissingValues': int(raw_df.isnull().any(axis=1).sum()), + 'NumberOfMissingValues': int(raw_df.isnull().sum().sum()), + 'NumberOfNumericFeatures': raw_df.select_dtypes(include=['number']).shape[1], + 'NumberOfSymbolicFeatures': raw_df.select_dtypes(include=['object']).shape[1], + } + + df_head_text = raw_df.head().to_string(index=False) + + dataset_info = { + "name": self.name, + "description": "", + "target_col": self.target_col, + "metadata": metadata, + "df_head": df_head_text + } + return dataset_info + + def get_metric(self): + dataset_info = self.get_dataset_info() + num_classes = dataset_info["metadata"]["NumberOfClasses"] + if num_classes == 2: + metric = "f1" + elif 2 < num_classes <= 200: + metric = "f1 weighted" + elif num_classes > 200 or num_classes == 0: + metric = "rmse" + else: + raise ValueError(f"Number of classes {num_classes} not supported") + return metric + + def create_base_requirement(self): + metric = self.get_metric() + req = BASE_USER_REQUIREMENT.format(datasetname=self.name, target_col=self.target_col, metric=metric) + return req + + def save_dataset(self, target_col): + + df = self.get_raw_dataset() + if not self.check_dataset_exists() or self.force_update: + print(f"Saving Dataset {self.name} in {self.dataset_dir}") + self.split_and_save(df, target_col) + else: + print(f"Dataset {self.name} already exists") + if not self.check_datasetinfo_exists() or self.force_update: + print(f"Saving Dataset info for {self.name}") + dataset_info = self.get_dataset_info() + self.save_datasetinfo(dataset_info) + else: + print(f"Dataset info for {self.name} already exists") + + def save_datasetinfo(self, dataset_info): + with open(Path(self.dataset_dir, self.name, "dataset_info.json"), "w") as file: + json.dump(dataset_info, file, indent=4) + + def save_split_datasets(self, df, split, target_col=None): + path = Path(self.dataset_dir, self.name) + df.to_csv(Path(path, f"split_{split}.csv"), index=False) + if target_col: + df_wo_target = df.drop(columns=[target_col]) + df_wo_target.to_csv(Path(path, f"split_{split}_wo_target.csv"), index=False) + df_target = df[[target_col]].copy() + if target_col != "target": + df_target["target"] = df_target[target_col] + df_target = df_target.drop(columns=[target_col]) + df_target.to_csv(Path(path, f"split_{split}_target.csv"), index=False) + + def split_and_save(self, df, target_col): + if not target_col: + raise ValueError("Target column not provided") + train, test = train_test_split(df, test_size=1-TRAIN_TEST_SPLIT, random_state=SEED) + train, dev = train_test_split(train, test_size=1-TRAIN_DEV_SPLIT, random_state=SEED) + self.save_split_datasets(train, "train") + self.save_split_datasets(dev, "dev", target_col) + self.save_split_datasets(test, "test", target_col) + + + +class OpenMLExpDataset(ExpDataset): + def __init__(self, name, dataset_dir, dataset_id, **kwargs): + self.dataset_id = dataset_id + self.dataset = openml.datasets.get_dataset(self.dataset_id, + download_data=False, + download_qualities=False, + download_features_meta_data=True) + self.name = self.dataset.name + self.target_col = self.dataset.default_target_attribute + super().__init__(self.name, dataset_dir, target_col=self.target_col, **kwargs) + + + def get_raw_dataset(self): + dataset = self.dataset + dataset_df, *_ = dataset.get_data() + raw_dir = Path(self.dataset_dir, self.name, "raw") + os.makedirs(raw_dir, exist_ok=True) + dataset_df.to_csv(Path(raw_dir, "train.csv"), index=False) + return dataset_df + + def get_dataset_info(self): + dataset_info = super().get_dataset_info() + dataset = self.dataset + dataset_info["name"] = dataset.name + dataset_info["description"] = dataset.description + dataset_info["metadata"].update(dataset.qualities) + return dataset_info + + +# class HFExpDataset(ExpDataset): +# def __init__(self, name, dataset_dir, dataset_name, **kwargs): +# super().__init__(name, dataset_dir, **kwargs) + + + +if __name__ == "__main__": + datasets_dir = "D:/work/automl/datasets" + force_update = True + datasets_dict = {"datasets": {}} + solution_designer = SolutionDesigner() + for dataset_id in OPENML_DATASET_IDS: + openml_dataset = OpenMLExpDataset("", datasets_dir, dataset_id, force_update=force_update) + asyncio.run(solution_designer.generate_solutions(openml_dataset.get_dataset_info(), openml_dataset.name)) + dataset_dict = create_dataset_dict(openml_dataset) + datasets_dict["datasets"][openml_dataset.name] = dataset_dict + + for dataset_name, target_col in CUSTOM_DATASETS: + custom_dataset = ExpDataset(dataset_name, datasets_dir, target_col=target_col, force_update=force_update) + asyncio.run(solution_designer.generate_solutions(custom_dataset.get_dataset_info(), custom_dataset.name)) + dataset_dict = create_dataset_dict(custom_dataset) + datasets_dict["datasets"][custom_dataset.name] = dataset_dict + + save_datasets_dict_to_yaml(datasets_dict) diff --git a/expo/datasets.yaml b/expo/datasets.yaml new file mode 100644 index 000000000..ec00e3d1f --- /dev/null +++ b/expo/datasets.yaml @@ -0,0 +1,134 @@ +datasets: + 04_titanic: + dataset: 04_titanic + metric: f1 + user_requirement: "This is a 04_titanic dataset. Your goal is to predict the target\ + \ column `Survived`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\ + \ or make any visualizations.\n" + 05_house-prices-advanced-regression-techniques: + dataset: 05_house-prices-advanced-regression-techniques + metric: rmse + user_requirement: "This is a 05_house-prices-advanced-regression-techniques dataset.\ + \ Your goal is to predict the target column `SalePrice`.\nPerform data analysis,\ + \ data preprocessing, feature engineering, and modeling to predict the target.\ + \ \nReport rmse on the eval data. Do not plot or make any visualizations.\n" + 06_santander-customer-transaction-prediction: + dataset: 06_santander-customer-transaction-prediction + metric: f1 + user_requirement: "This is a 06_santander-customer-transaction-prediction dataset.\ + \ Your goal is to predict the target column `target`.\nPerform data analysis,\ + \ data preprocessing, feature engineering, and modeling to predict the target.\ + \ \nReport f1 on the eval data. Do not plot or make any visualizations.\n" + 07_icr-identify-age-related-conditions: + dataset: 07_icr-identify-age-related-conditions + metric: f1 + user_requirement: "This is a 07_icr-identify-age-related-conditions dataset. Your\ + \ goal is to predict the target column `Class`.\nPerform data analysis, data\ + \ preprocessing, feature engineering, and modeling to predict the target. \n\ + Report f1 on the eval data. Do not plot or make any visualizations.\n" + Click_prediction_small: + dataset: Click_prediction_small + metric: f1 + user_requirement: "This is a Click_prediction_small dataset. Your goal is to predict\ + \ the target column `click`.\nPerform data analysis, data preprocessing, feature\ + \ engineering, and modeling to predict the target. \nReport f1 on the eval data.\ + \ Do not plot or make any visualizations.\n" + GesturePhaseSegmentationProcessed: + dataset: GesturePhaseSegmentationProcessed + metric: f1 weighted + user_requirement: "This is a GesturePhaseSegmentationProcessed dataset. Your goal\ + \ is to predict the target column `Phase`.\nPerform data analysis, data preprocessing,\ + \ feature engineering, and modeling to predict the target. \nReport f1 weighted\ + \ on the eval data. Do not plot or make any visualizations.\n" + Moneyball: + dataset: Moneyball + metric: rmse + user_requirement: "This is a Moneyball dataset. Your goal is to predict the target\ + \ column `RS`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport rmse on the eval data. Do not\ + \ plot or make any visualizations.\n" + SAT11-HAND-runtime-regression: + dataset: SAT11-HAND-runtime-regression + metric: rmse + user_requirement: "This is a SAT11-HAND-runtime-regression dataset. Your goal\ + \ is to predict the target column `runtime`.\nPerform data analysis, data preprocessing,\ + \ feature engineering, and modeling to predict the target. \nReport rmse on\ + \ the eval data. Do not plot or make any visualizations.\n" + boston: + dataset: boston + metric: rmse + user_requirement: "This is a boston dataset. Your goal is to predict the target\ + \ column `MEDV`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport rmse on the eval data. Do not\ + \ plot or make any visualizations.\n" + colleges: + dataset: colleges + metric: rmse + user_requirement: "This is a colleges dataset. Your goal is to predict the target\ + \ column `percent_pell_grant`.\nPerform data analysis, data preprocessing, feature\ + \ engineering, and modeling to predict the target. \nReport rmse on the eval\ + \ data. Do not plot or make any visualizations.\n" + credit-g: + dataset: credit-g + metric: f1 + user_requirement: "This is a credit-g dataset. Your goal is to predict the target\ + \ column `class`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\ + \ or make any visualizations.\n" + diamonds: + dataset: diamonds + metric: rmse + user_requirement: "This is a diamonds dataset. Your goal is to predict the target\ + \ column `price`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport rmse on the eval data. Do not\ + \ plot or make any visualizations.\n" + jasmine: + dataset: jasmine + metric: f1 + user_requirement: "This is a jasmine dataset. Your goal is to predict the target\ + \ column `class`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\ + \ or make any visualizations.\n" + kc1: + dataset: kc1 + metric: f1 + user_requirement: "This is a kc1 dataset. Your goal is to predict the target column\ + \ `defects`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\ + \ or make any visualizations.\n" + kick: + dataset: kick + metric: f1 + user_requirement: "This is a kick dataset. Your goal is to predict the target\ + \ column `IsBadBuy`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\ + \ or make any visualizations.\n" + mfeat-factors: + dataset: mfeat-factors + metric: f1 weighted + user_requirement: "This is a mfeat-factors dataset. Your goal is to predict the\ + \ target column `class`.\nPerform data analysis, data preprocessing, feature\ + \ engineering, and modeling to predict the target. \nReport f1 weighted on the\ + \ eval data. Do not plot or make any visualizations.\n" + segment: + dataset: segment + metric: f1 weighted + user_requirement: "This is a segment dataset. Your goal is to predict the target\ + \ column `class`.\nPerform data analysis, data preprocessing, feature engineering,\ + \ and modeling to predict the target. \nReport f1 weighted on the eval data.\ + \ Do not plot or make any visualizations.\n" + steel-plates-fault: + dataset: steel-plates-fault + metric: f1 weighted + user_requirement: "This is a steel-plates-fault dataset. Your goal is to predict\ + \ the target column `target`.\nPerform data analysis, data preprocessing, feature\ + \ engineering, and modeling to predict the target. \nReport f1 weighted on the\ + \ eval data. Do not plot or make any visualizations.\n" + wine-quality-white: + dataset: wine-quality-white + metric: f1 weighted + user_requirement: "This is a wine-quality-white dataset. Your goal is to predict\ + \ the target column `Class`.\nPerform data analysis, data preprocessing, feature\ + \ engineering, and modeling to predict the target. \nReport f1 weighted on the\ + \ eval data. Do not plot or make any visualizations.\n" diff --git a/expo/evaluation/evaluation.py b/expo/evaluation/evaluation.py new file mode 100644 index 000000000..20a35aa27 --- /dev/null +++ b/expo/evaluation/evaluation.py @@ -0,0 +1,23 @@ +from sklearn.metrics import f1_score, accuracy_score, roc_auc_score, mean_squared_error +import numpy as np + +def evaluate_score(pred, gt, metric): + if metric == "accuracy": + return accuracy_score(gt, pred) + elif metric == "f1": + unique_classes = np.unique(gt) + if 1 in unique_classes and 0 in unique_classes: + pos_label = 1 + else: + pos_label = unique_classes[0] if len(unique_classes) == 2 else None + return f1_score(gt, pred, pos_label=pos_label) + elif metric == "f1 weighted": + return f1_score(gt, pred, average="weighted") + elif metric == "roc_auc": + return roc_auc_score(gt, pred) + elif metric == "rmse": + return mean_squared_error(gt, pred, squared=False) + elif metric == "log rmse": + return mean_squared_error(np.log1p(gt), np.log1p(pred), squared=False) + else: + raise ValueError(f"Metric {metric} not supported") \ No newline at end of file diff --git a/expo/evaluation/visualize_mcts.py b/expo/evaluation/visualize_mcts.py new file mode 100644 index 000000000..6e38576e2 --- /dev/null +++ b/expo/evaluation/visualize_mcts.py @@ -0,0 +1,54 @@ + +from expo.MCTS import Node, MCTS +import textwrap + +NODE_TEMPLATE = """\ +[Node {id}] +Plans: +{plans} +Simulated: {simulated} +Score: {score}, Visits: {num_visits} + +""" + +def get_role_plans(role): + plans = role.planner.plan.tasks + instruct_plans = [f"{i+1}. {task.instruction}" for i, task in enumerate(plans)] + return instruct_plans + + +def get_tree_text(node : Node): + role_dict = {} + code_set = set() + def load_role(node): + if node.id not in role_dict: + role_dict[node.id] = node.load_role() + return role_dict[node.id] + + def visualize_node(node : Node, previous_plans=None): + role = load_role(node) + node_id = node.id + plans = role.planner.plan.tasks + instruct_plans = [f"{i+1}. {task.instruction}" for i, task in enumerate(plans)] + if previous_plans is not None: + instruct_plans = [plan for plan, prev_plan in zip(instruct_plans, previous_plans) if plan != prev_plan] + instruct_plans_text = "\n".join(instruct_plans) + simulated = role.state_saved + score = f"avg score: {node.avg_value()}, simulated score: {node.raw_reward}" + num_visits = node.visited + return NODE_TEMPLATE.format(id=node_id, plans=instruct_plans_text, simulated=simulated, score=score, num_visits=num_visits) + + def visualize_tree(node, depth=0, previous_plans=None): + text = "" + if node is not None: + text += visualize_node(node, previous_plans) + role = load_role(node) + code_set.update({task.instruction for task in role.planner.plan.tasks}) + previous_plans = get_role_plans(role) + for child in node.children: + text += textwrap.indent(visualize_tree(child, depth+1, previous_plans), "\t") + return text + + return visualize_tree(node), len(code_set) + + diff --git a/expo/experimenter/aug_experimenter.py b/expo/experimenter/aug_experimenter.py new file mode 100644 index 000000000..e69de29bb diff --git a/expo/experimenter/experimenter.py b/expo/experimenter/experimenter.py new file mode 100644 index 000000000..d7ed82070 --- /dev/null +++ b/expo/experimenter/experimenter.py @@ -0,0 +1,18 @@ + +class Experimenter: + result_path : str = "results" + + async def run_experiment(self): + pass + + + def save_scores(self): + pass + + def save_result(self): + results = { + "test_score": self.test_score, + "num_experiments": self.num_experiments, + "insights": self.insights, + "avg_score": self.avg_score, + } \ No newline at end of file diff --git a/expo/experimenter/mcts_experimenter.py b/expo/experimenter/mcts_experimenter.py new file mode 100644 index 000000000..e69de29bb diff --git a/expo/insights/InsightGenerate.py b/expo/insights/InsightGenerate.py new file mode 100644 index 000000000..de58b7e4e --- /dev/null +++ b/expo/insights/InsightGenerate.py @@ -0,0 +1,114 @@ +REFLECTION_SYSTEM_MSG = "As a Kaggle grandmaster participating in a competition, you need to analyze your experience and propose evolutionary points that are more likely to improve the performance of baseline code." + +CHANGE_INSTRUCTION = """ +# Original instruction +{instruction} + +# Insights +{insights} + +Rewrite the original instruction according to the insights + +# Expected Output Hard Format +```json +{{ + "Original Instruction": "original instruction", + "New Instruction": "new instruction" +}} +``` +""" + +import re +import random +import json +from metagpt.llm import LLM +from metagpt.schema import Message +from examples.MCTS_test.utils import load_data_config, mcts_logger +DATA_CONFIG = load_data_config() + + +class InsightGenerator: + data_config = DATA_CONFIG + + @staticmethod + def load_json_data(json_dir): + with open(json_dir, "r") as file: + json_data = json.load(file) + return json_data + + @staticmethod + def _random_sample(analysis, num_samples): + return random.sample(analysis, num_samples) + + @staticmethod + def sample_instruction_set(data): + data_dict = {} + for item in data: + task_id = item["task_id"] + if task_id not in data_dict: + data_dict[task_id] = [] + data_dict[task_id].append(item) + instruction_set = [] + for task_id in sorted(data_dict.keys()): + instruction_set.append(random.choice(data_dict[task_id])) + return instruction_set + + + @staticmethod + def clean_json_from_rsp(text): + pattern = r"```json(.*?)```" + matches = re.findall(pattern, text, re.DOTALL) + if matches: + json_str = "\n".join(matches) + return json_str + else: + return "" + + @staticmethod + def format_output(rsp): + rsp_list = [] + new_data = [] + rsp_list.append(rsp) + for item in rsp_list: + item_dict = json.loads(item) + data = { + "Insights": item_dict, + } + new_data.append(data) + return new_data + + @staticmethod + def load_analysis_pool(file_path, task_id=None): + data = InsightGenerator.load_json_data(file_path) + for item in data: + if "task_id" not in item: + raise ValueError("task_id is not found in the analysis pool") + + if task_id: + data = [item for item in data if int(item["task_id"]) == int(task_id)] + return data + + @staticmethod + async def generate_new_instructions(task_id, original_instruction, max_num, file_path): + data = InsightGenerator.load_analysis_pool(file_path, task_id) + new_instructions = [] + if len(data) == 0: + mcts_logger.log("MCTS", f"No insights available for task {task_id}") + return [original_instruction] # Return the original instruction if no insights are available + for item in data[:max_num]: + insights = item["Analysis"] + new_instruction = await InsightGenerator.generate_new_instruction(original_instruction, insights) + new_instructions.append(new_instruction) + return new_instructions + + @staticmethod + async def generate_new_instruction(original_instruction, insights): + prompt = CHANGE_INSTRUCTION.format(instruction=original_instruction, insights=insights) + llm = LLM() + context = llm.format_msg([Message(content=prompt, role="user")]) + llm_response = await llm.aask( + context, system_msgs=[REFLECTION_SYSTEM_MSG] + ) + rsp = InsightGenerator.clean_json_from_rsp(llm_response) + new_instruction = json.loads(rsp)["New Instruction"] + return new_instruction \ No newline at end of file diff --git a/expo/insights/solution_designer.py b/expo/insights/solution_designer.py new file mode 100644 index 000000000..0986c392a --- /dev/null +++ b/expo/insights/solution_designer.py @@ -0,0 +1,127 @@ +import re +import random +import json +from metagpt.llm import LLM +from metagpt.schema import Message +from examples.MCTS_test.utils import clean_json_from_rsp, load_data_config + + +DATA_CONFIG = load_data_config() + +DATASET_INSIGHT_PROMPT = """ +# Dataset Description +{dataset} + +# Dataset Metadata +{metadata} + +# Dataset Head +{head} + +# Instruction +Propose insights to help improve the performance of the model on this dataset. +The insights should be proposed based on the dataset description with different task types. +Each task type should have at least 5 insights. +Make sure each method is independent and can be implemented separately. + +# Format +```json +[ + {{ + "task_type": "EDA", + "insights": [ + "insight1", + "insight2", + "insight3", + ... + "insightN" + ] + }}, + {{ + "task_type": "Data Preprocessing", + "insights": [ + "insight1", + "insight2", + "insight3", + ... + "insightN" + ] + }}, + {{ + "task_type": "Feature Engineering", + "insights": [ + "insight1", + "insight2", + "insight3", + ... + "insightN" + ] + }}, + {{ + "task_type": "Model Training", + "insights": [ + "insight1", + "insight2", + "insight3", + ... + "insightN" + ] + }} +] +``` +""" + +KEY_DATASET_FEATURES = [ + 'NumberOfClasses', + 'NumberOfFeatures', + 'NumberOfInstances', + 'NumberOfInstancesWithMissingValues', + 'NumberOfMissingValues', + 'NumberOfNumericFeatures', + 'NumberOfSymbolicFeatures' +] + +TASK_TO_ID = { + "EDA": 1, + "Data Preprocessing": 2, + "Feature Engineering": 3, + "Model Training": 4, + "Model Evaluation": 5 +} + +class SolutionDesigner: + data_dir : str= DATA_CONFIG["datasets_dir"] + + async def generate_solutions(self, dataset_info, dataset_name): + llm = LLM() + context = DATASET_INSIGHT_PROMPT.format(dataset=dataset_info["description"], + metadata=self.metadata_builder(dataset_info["metadata"]), + head=dataset_info["df_head"]) + rsp = await llm.aask(context) + rsp = clean_json_from_rsp(rsp) + analysis_pool = self.process_analysis_pool(json.loads(rsp)) + dataset_path = f"{self.data_dir}/{dataset_name}" + self.save_analysis_pool(dataset_path, analysis_pool) + + + def process_analysis_pool(self, insights_rsp): + analysis_pool = [] + for task_type_insights in insights_rsp: + task_type = task_type_insights["task_type"] + for insight in task_type_insights["insights"]: + analysis_pool.append({"Analysis": insight, "Category": task_type, "task_id": TASK_TO_ID[task_type]}) + return analysis_pool + + + def metadata_builder(self, qualities): + metadata = {} + for key in KEY_DATASET_FEATURES: + metadata[key] = qualities.get(key, "N/A") + metadata_text = json.dumps(metadata, indent=4) + return metadata_text + + def save_analysis_pool(self, dataset_path, analysis_pool): + fpath = f"{dataset_path}/ds_analysis_pool.json" + with open(fpath, "w") as file: + json.dump(analysis_pool, file, indent=4) + \ No newline at end of file diff --git a/expo/research_assistant.py b/expo/research_assistant.py new file mode 100644 index 000000000..fbd74f7db --- /dev/null +++ b/expo/research_assistant.py @@ -0,0 +1,141 @@ +from __future__ import annotations + +import json +from metagpt.roles.di.data_interpreter import DataInterpreter +from metagpt.schema import Message, Task, TaskResult +from metagpt.strategy.task_type import TaskType +from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender +from metagpt.utils.common import CodeParser +from metagpt.utils.common import write_json_file, read_json_file, format_trackback_info +from metagpt.const import MESSAGE_ROUTE_TO_ALL, SERDESER_PATH +from metagpt.utils.recovery_util import save_history +from expo.utils import mcts_logger, save_notebook +import re +import os + +EXTRACT_SCORE_PROMPT = """ +# Code: +{code} + +# Execution Result: +{result} + +# Instruction: +Based on the code and execution result, please extract the scores and return it as a dictionary. +If you cannot find the scores, please still return a dictionary with the keys 'train_score', 'dev_score', and 'test_score', and set the values to -1. + +# Format: +```json +{{ + "train_score": x.x, + "dev_score": x.x, + "test_score": x.x, +}} +``` +""" + +class ResearchAssistant(DataInterpreter): + node_id: str = "0" + start_task_id: int = 1 + state_saved : bool = False + role_dir : str = SERDESER_PATH.joinpath("team", "environment", "roles", f"Experimenter") + + def get_node_name(self): + return f"Node-{self.node_id}" + + def get_next_instruction(self): + return self.planner.plan.tasks[self.start_task_id] + + def change_next_instruction(self, new_instruction): + if new_instruction is not None: + self.planner.plan.task_map[str(self.start_task_id)].instruction = new_instruction + self.remap_tasks() + + + def update_til_start_task(self, role: ResearchAssistant, backward: bool = True): + if backward: + # make sure the previous task instructions are matched + assert self.start_task_id == role.start_task_id - 1, f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}" + for i in range(self.start_task_id): + if self.planner.plan.task_map[str(self.start_task_id)].instruction != role.planner.plan.task_map[str(self.start_task_id)].instruction: + mcts_logger.info("Previous task instructions not matched") + self.remap_tasks() + return + # copy new role's task (self.start_task_id) to current role + self.planner.plan.task_map[str(self.start_task_id)] = role.planner.plan.task_map[str(self.start_task_id)].model_copy() + self.remap_tasks() + + else: + assert self.start_task_id == role.start_task_id + 1, f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}" + if int(role.planner.plan.current_task_id) > self.start_task_id: + for i in range(role.start_task_id): + self.planner.plan.task_map[str(i)] = role.planner.plan.task_map[str(i)].model_copy() + self.remap_tasks() + + async def get_score(self): + score_dict = await self.llm_extract_score() + score_dict["score"] = score_dict["dev_score"] + return score_dict + + async def llm_extract_score(self): + result_text = self.planner.plan.task_map[str(len(self.planner.plan.task_map))].result + code_text = self.planner.plan.task_map[str(len(self.planner.plan.task_map))].code + rsp = await self.llm.aask(EXTRACT_SCORE_PROMPT.format(code=code_text, result=result_text, role="user")) + json_block = CodeParser.parse_code(block=None, text=rsp) + score_dict = json.loads(json_block) + return score_dict + + async def _act_on_task(self, current_task: Task) -> TaskResult: + """Useful in 'plan_and_act' mode. Wrap the output in a TaskResult for review and confirmation.""" + mcts_logger.info(f"The current_task is: {current_task}") + + # 执行任务的代码 + code, result, is_success = await self._write_and_exec_code() + task_result = TaskResult(code=code, result=result, is_success=is_success) + # 只在任务类型为 'feature engineering' 时保存状态 + if int(current_task.task_id) == self.start_task_id + 1: + # fe_id = current_task.dependent_task_ids + self.save_state() + save_notebook(role=self, save_dir=self.role_dir, name=self.get_node_name()) + return task_result + + def save_state(self, static_save=False): + if self.state_saved and not static_save: + return + if not static_save: + self.state_saved = True + mcts_logger.log("MCTS", f"Saving state at task {self.start_task_id}") + else: + mcts_logger.log("MCTS", f"Static Saving") + stg_path = self.role_dir + name = self.get_node_name() + role_path = os.path.join(stg_path, f"{name}.json") + # 将状态保存为 JSON 文件 + write_json_file(role_path, self.model_dump()) + save_history(role=self, save_dir=stg_path, name=name) + + + def remap_tasks(self): + self.planner.plan.tasks = [self.planner.plan.task_map[task_id] for task_id in sorted(self.planner.plan.task_map.keys())] + + + async def run(self, with_message=None) -> Message | None: + """Observe, and think and act based on the results of the observation""" + if with_message == "continue": + # self.set_todo(None) + # working_memory = self.working_memory + # self.remap_tasks() + mcts_logger.info("Continue to run") + self.rc.working_memory.clear() + self.working_memory.clear() + # self.rc.todo = WriteAnalysisCode() + rsp = await self.react() + # 发送响应消息给 Environment 对象,以便它将消息传递给订阅者 + self.set_todo(None) + self.publish_message(rsp) + return rsp + return await super().run(with_message) + + + + \ No newline at end of file diff --git a/expo/results/PLACEHOLDER b/expo/results/PLACEHOLDER new file mode 100644 index 000000000..e69de29bb diff --git a/expo/results/tree/TREE b/expo/results/tree/TREE new file mode 100644 index 000000000..e69de29bb diff --git a/expo/run_exp_augmentation.py b/expo/run_exp_augmentation.py new file mode 100644 index 000000000..492a424d4 --- /dev/null +++ b/expo/run_exp_augmentation.py @@ -0,0 +1,96 @@ +import os +from metagpt.roles.di.research_assistant import ResearchAssistant +import asyncio +from examples.MCTS_test.utils import DATA_CONFIG, generate_task_requirement, get_exp_pool_path +from examples.MCTS_test.insights.InsightGenerate import InsightGenerator +from examples.MCTS_test.MCTS import create_initial_state +from examples.MCTS_test.evaluation.evaluation import evaluate_score +import json +import argparse +import pandas as pd +import datetime + +EXPS_PROMPT = """ +When doing the tasks, you can refer to the insights below: +{experience} + +""" +data_config = DATA_CONFIG + +def evaluate_test(score, state): + datetime_text = datetime.datetime.now().strftime("%Y%m%d%H%M") + task_name = state["task"] + prediction_fpath = os.path.join(state["work_dir"], task_name, "predictions.csv") + predictions = pd.read_csv(prediction_fpath)["target"] + # copy predictions.csv to the node_dir + + predictions_node_fpath = os.path.join("results", f"{task_name}-{datetime_text}-predictions.csv") + predictions.to_csv(predictions_node_fpath, index=False) + # load test_target.csv + split_datasets_dir = state["datasets_dir"] + gt = pd.read_csv(os.path.join(split_datasets_dir["test_target"]))["target"] + metric = state["dataset_config"]["metric"] + score["test_score"] = evaluate_score(predictions, gt, metric) + return score + + + + +async def main(task_name, use_reflection=True, mode="single", num_experiments=2): + """ + mode: single or set + single: sample one instruction + set: sample a set of instructions + """ + low_is_better = False + state = create_initial_state(task_name, start_task_id=1, data_config=data_config, low_is_better=low_is_better, name="") + + user_requirement = generate_task_requirement(task_name, data_config) + exp_pool_path = get_exp_pool_path(task_name, data_config, pool_name="ds_analysis_pool") + exp_pool = InsightGenerator.load_analysis_pool(exp_pool_path) + if mode == "single": + exps = InsightGenerator._random_sample(exp_pool, num_experiments) + exps = [exp["Analysis"] for exp in exps] + elif mode == "set": + exp_set = InsightGenerator.sample_instruction_set(exp_pool) + exp_set_text = "\n".join([f"{exp['task_id']}: {exp['Analysis']}" for exp in exp_set]) + exps = [exp_set_text] * num_experiments + else: + raise ValueError(f"Invalid mode: {mode}") + + scores = [] + for i in range(num_experiments): + di = ResearchAssistant(node_id=str(i), use_reflection=use_reflection) + di.role_dir = f"{di.role_dir}_{task_name}" + requirement = user_requirement + EXPS_PROMPT.format(experience=exps[i]) + print(requirement) + await di.run(requirement) + score = await di.get_score(low_is_better=False) + score = evaluate_test(score, state) + + scores.append(score) + + + with open(f"results/{task_name}_scores.json", "w") as f: + # save scores and corresponding insights + results = {"avg_score": sum([score["test_score"] for score in scores if score])/num_experiments, + "max_score": max([score["test_score"] for score in scores]), + "scores": scores, "insights": exps} + json.dump(results, f, indent=4) + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--task", type=str, default="titanic") + parser.add_argument("--use_reflection", dest="use_reflection", action="store_true") + parser.add_argument("--no_use_reflection", dest="use_reflection", action="store_false") + parser.set_defaults(use_reflection=True) + parser.add_argument("--mode", type=str, default="single") + parser.add_argument("--num_experiments", type=int, default=2) + return parser.parse_args() + + + +if __name__ == "__main__": + args = parse_args() + asyncio.run(main(args.task, use_reflection=args.use_reflection, mode=args.mode, num_experiments=args.num_experiments)) diff --git a/expo/run_experiment.py b/expo/run_experiment.py new file mode 100644 index 000000000..e75897f5a --- /dev/null +++ b/expo/run_experiment.py @@ -0,0 +1,44 @@ +from examples.MCTS_test.MCTS import MCTS, Node, initialize_di_root_node +from examples.MCTS_test.utils import load_data_config, generate_task_requirement +from examples.MCTS_test.visualize_mcts import get_tree_text +import asyncio +import argparse + + +def get_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--name", type=str, default="") + get_di_args(parser) + get_mcts_args(parser) + get_aug_exp_args(parser) + + + return parser.parse_args() + + +def get_mcts_args(parser): + parser.add_argument("--load_tree", dest="load_tree", action="store_true") + parser.add_argument("--no_load_tree", dest="load_tree", action="store_false") + parser.set_defaults(load_tree=True) + parser.add_argument("--rollout", type=int, default=3) + +def get_aug_exp_args(parser): + parser.add_argument("--aug_mode", type=str, default="single", choices=["single", "set"]) + parser.add_argument("--num_experiments", type=int, default=2) + + +def get_di_args(parser): + parser.add_argument("--task", type=str, default="titanic") + parser.add_argument("--low_is_better", dest="low_is_better", action="store_true") + parser.set_defaults(low_is_better=False) + parser.add_argument("--reflection", dest="reflection", action="store_true") + parser.add_argument("--no_reflection", dest="reflection", action="store_false") + parser.set_defaults(reflection=True) + + +async def main(args): + pass + +if __name__ == "__main__": + args = get_args() + asyncio.run(main(args)) \ No newline at end of file diff --git a/expo/run_mcts.py b/expo/run_mcts.py new file mode 100644 index 000000000..0c0c486db --- /dev/null +++ b/expo/run_mcts.py @@ -0,0 +1,48 @@ +from expo.MCTS import MCTS, Node, initialize_di_root_node +from expo.utils import load_data_config, generate_task_requirement +from expo.evaluation.visualize_mcts import get_tree_text +import asyncio +import argparse + + +def get_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--task", type=str, default="titanic") + parser.add_argument("--low_is_better", dest="low_is_better", action="store_true") + parser.set_defaults(low_is_better=False) + parser.add_argument("--load_tree", dest="load_tree", action="store_true") + parser.add_argument("--no_load_tree", dest="load_tree", action="store_false") + parser.set_defaults(load_tree=True) + parser.add_argument("--reflection", dest="reflection", action="store_true") + parser.add_argument("--no_reflection", dest="reflection", action="store_false") + parser.set_defaults(reflection=True) + parser.add_argument("--rollout", type=int, default=3) + parser.add_argument("--name", type=str, default="") + return parser.parse_args() + + +data_config = load_data_config() + +if __name__ == "__main__": + args = get_args() + requirement = generate_task_requirement(args.task, data_config) + print(requirement) + + # role, root_node = initialize_di_root_node(requirement, data_config) + # asyncio.run(role.run(requirement)) + + # asyncio.run(root_node.run_node()) + mcts = MCTS(root_node=None, max_depth=5) + best_node = asyncio.run(mcts.search(args.task, data_config, + low_is_better=args.low_is_better, load_tree=args.load_tree, + reflection=args.reflection, rollout=args.rollout, name=args.name)) + text, num_generated_codes = get_tree_text(mcts.root_node) + print(text) + print(f"Generated {num_generated_codes} unique codes.") + + with open(f"results/{args.task}_tree{args.name}.txt", "w") as f: + f.write(f"Generated {num_generated_codes} unique codes.\n") + f.write(f"Best node: {best_node}, score: {best_node.raw_reward}\n") + f.write(text) + + diff --git a/expo/utils.py b/expo/utils.py new file mode 100644 index 000000000..ac4a64697 --- /dev/null +++ b/expo/utils.py @@ -0,0 +1,150 @@ +import yaml +from examples.MCTS_test.dataset import get_user_requirement, get_split_dataset_path +from metagpt.roles.role import Role +from metagpt.actions.di.execute_nb_code import ExecuteNbCode +from metagpt.utils.save_code import save_code_file +# from nbclient import NotebookClient +from nbformat.notebooknode import NotebookNode +import nbformat +from pathlib import Path +from loguru import logger as _logger +from datetime import datetime +import sys +import os +import re + +TASK_PROMPT = """\ +# User requirement +{user_requirement} +**Attention** Please do not leak the target label in any form during training. + +## Saving Dev and Test Predictions +Save the prediction results of the dev set and test set in `dev_predictions.csv` and `test_predictions.csv` respectively in the output directory BEFORE printig out the results. +The file should contain a single `target` column with the predicted values. +Make sure the prediction results are in the same format as the target column in the training set. The labels should be transformed back to the original format if any transformation was applied during training. + +## Output Training Set Performance +Make sure the performance of the model is printed in python in the last step even if it has been printed in the previous steps. The value should be a float number. +Print the training set performance in the last step. Write in this format: +```python +... +print("Train score:", train_score) +``` + +# Data dir +training: {train_path} +dev: {dev_path} +testing: {test_path} + +# Output dir +{output_dir} + +""" + +def load_data_config(file_path="data.yaml"): + with open(file_path, 'r') as stream: + data_config = yaml.safe_load(stream) + return data_config + +DATA_CONFIG = load_data_config() + +def get_mcts_logger(): + print_level = "INFO" + print_level2 = "MCTS" + logfile_level="MCTS" + name: str = None + current_date = datetime.now() + formatted_date = current_date.strftime("%Y%m%d") + log_name = f"{name}_{formatted_date}" if name else formatted_date # name a log with prefix name + + _logger.remove() + new_level = _logger.level(logfile_level, color="", no=25) + _logger.add(sys.stderr, level=print_level) + _logger.add(sys.stderr, level=print_level2) + _logger.add(Path(DATA_CONFIG["work_dir"]) / DATA_CONFIG["role_dir"] / f"{log_name}.txt", level=logfile_level) + _logger.propagate = False + return _logger + +mcts_logger = get_mcts_logger() + + +def get_exp_pool_path(task_name, data_config, pool_name="analysis_pool"): + datasets_dir = data_config['datasets_dir'] + if task_name in data_config['datasets']: + dataset = data_config['datasets'][task_name] + data_path = os.path.join(datasets_dir, dataset['dataset']) + else: + raise ValueError(f"Dataset {task_name} not found in config file. Available datasets: {data_config['datasets'].keys()}") + exp_pool_path = os.path.join(data_path, f"{pool_name}.json") + return exp_pool_path + +def generate_task_requirement(task_name, data_config): + user_requirement = get_user_requirement(task_name, data_config) + split_dataset_path = get_split_dataset_path(task_name, data_config) + train_path = split_dataset_path["train"] + dev_path = split_dataset_path["dev_wo_target"] + test_path = split_dataset_path["test_wo_target"] + work_dir = data_config["work_dir"] + output_dir = f"{work_dir}/{task_name}" + user_requirement = TASK_PROMPT.format(user_requirement=user_requirement, + train_path=train_path, dev_path=dev_path, test_path=test_path, + output_dir=output_dir) + return user_requirement + +def change_plan(role, plan): + print(f"Change next plan to: {plan}") + tasks = role.planner.plan.tasks + finished = True + for i, task in enumerate(tasks): + if not task.code: + finished = False + break + if not finished: + tasks[i].plan = plan + return finished + + + +def is_cell_to_delete(cell: NotebookNode) -> bool: + if "outputs" in cell: + for output in cell["outputs"]: + if output and "traceback" in output: + return True + return False + + +def process_cells(nb: NotebookNode) -> NotebookNode: + new_cells = [] + i = 1 + for cell in nb["cells"]: + if cell["cell_type"] == "code" and not is_cell_to_delete(cell): + cell["execution_count"] = i + new_cells.append(cell) + i = i + 1 + nb["cells"] = new_cells + return nb + +def save_notebook(role: Role, save_dir: str = "", name: str = ""): + save_dir = Path(save_dir) + nb = process_cells(role.execute_code.nb) + save_code_file(name=name, code_context=nb, file_format="ipynb", save_dir=save_dir) + +async def load_execute_notebook(role): + tasks = role.planner.plan.tasks + codes = [task.code for task in tasks if task.code] + executor = role.execute_code + # await executor.build() + for code in codes: + outputs, success = await executor.run(code) + print(f"Execution success: {success}, Output: {outputs}") + print("Finish executing the loaded notebook") + return executor + +def clean_json_from_rsp(text): + pattern = r"```json(.*?)```" + matches = re.findall(pattern, text, re.DOTALL) + if matches: + json_str = "\n".join(matches) + return json_str + else: + return "" \ No newline at end of file