diff --git a/expo/MCTS.py b/expo/MCTS.py index 14f2c4e4b..7c03e2e86 100644 --- a/expo/MCTS.py +++ b/expo/MCTS.py @@ -1,23 +1,28 @@ -import random import math import os -import pandas as pd -from expo.research_assistant import ResearchAssistant -from expo.insights.instruction_generator import InstructionGenerator -from expo.dataset import get_split_dataset_path, generate_task_requirement -from expo.evaluation.evaluation import evaluate_score -from expo.utils import mcts_logger, load_execute_notebook, get_exp_pool_path - -from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender -from metagpt.utils.common import write_json_file, read_json_file, format_trackback_info -import numpy as np import pickle +import random + +import pandas as pd + +from expo.dataset import generate_task_requirement, get_split_dataset_path +from expo.evaluation.evaluation import evaluate_score +from expo.insights.instruction_generator import InstructionGenerator +from expo.research_assistant import ResearchAssistant +from expo.utils import get_exp_pool_path, load_execute_notebook, mcts_logger +from metagpt.tools.tool_recommend import ToolRecommender +from metagpt.utils.common import read_json_file + def initialize_di_root_node(task, data_config, low_is_better=False, reflection=True, name=""): start_task_id = 2 - state = create_initial_state(task, start_task_id=start_task_id, data_config=data_config, low_is_better=low_is_better, name=name) - role = ResearchAssistant(node_id="0", start_task_id=start_task_id, use_reflection=reflection, role_dir=state["node_dir"]) - return role, Node(parent=None, state=state, action=None, value=0) + state = create_initial_state( + task, start_task_id=start_task_id, data_config=data_config, low_is_better=low_is_better, name=name + ) + role = ResearchAssistant( + node_id="0", start_task_id=start_task_id, use_reflection=reflection, role_dir=state["node_dir"] + ) + return role, Node(parent=None, state=state, action=None, value=0) def create_initial_state(task, start_task_id, data_config, low_is_better, name): @@ -36,16 +41,16 @@ def create_initial_state(task, start_task_id, data_config, low_is_better, name): return initial_state -class Node(): - state : dict = {} - action : str = None - value : float = 0 - visited : int = 0 - children : list = [] - normalized_reward : dict = {"train_score": 0, "dev_score": 0, "test_score": 0} +class Node: + state: dict = {} + action: str = None + value: float = 0 + visited: int = 0 + children: list = [] + normalized_reward: dict = {"train_score": 0, "dev_score": 0, "test_score": 0} parent = None - def __init__(self, parent=None, state = None, action=None, value = 0, max_depth=4, **kwargs): + def __init__(self, parent=None, state=None, action=None, value=0, max_depth=4, **kwargs): self.state = state self.action = action self.value = value @@ -66,14 +71,14 @@ class Node(): def __hash__(self): return hash(self.id) - + def save_node(self): - os.makedirs(self.state["node_dir"], exist_ok=True) - with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), 'wb') as f: + os.makedirs(self.state["node_dir"], exist_ok=True) + with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), "wb") as f: pickle.dump(self, f) - + def load_node(self): - with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), 'rb') as f: + with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), "rb") as f: return pickle.load(f) def get_depth(self): @@ -94,14 +99,14 @@ class Node(): def is_terminal(self): return int(self.state["start_task_id"]) == self.max_depth + 1 - + def is_fully_expanded(self): return len(self.children) > 0 - + def add_child(self, child_node): self.children.append(child_node) - def update(self, reward:dict, child_node=None): + def update(self, reward: dict, child_node=None): if child_node is not None: child_role = child_node.load_role() role = self.load_role() @@ -117,46 +122,48 @@ class Node(): fname = f"Node-{self.id}.json" role_path = os.path.join(self.state["node_dir"], fname) return role_path - + def load_role(self): role_dict = read_json_file(self.get_role_path()) - if role_dict.get('tool_recommender') is None: - role_dict['tool_recommender'] = ToolRecommender() - elif isinstance(role_dict.get('tool_recommender', {}).get('tools'), dict): - role_dict['tool_recommender']['tools'] = list(role_dict['tool_recommender']['tools'].keys()) + if role_dict.get("tool_recommender") is None: + role_dict["tool_recommender"] = ToolRecommender() + elif isinstance(role_dict.get("tool_recommender", {}).get("tools"), dict): + role_dict["tool_recommender"]["tools"] = list(role_dict["tool_recommender"]["tools"].keys()) role = ResearchAssistant(**role_dict) - if self.parent is not None: # TODO: Check this + if self.parent is not None: # TODO: Check this parent_role = self.parent.load_role() role.update_til_start_task(parent_role, backward=False) role.remap_tasks() return role - + def save_new_role(self, role: ResearchAssistant): role.node_id = self.id - role.start_task_id = self.state['start_task_id'] + role.start_task_id = self.state["start_task_id"] role.state_saved = False - role.change_next_instruction(self.action) + role.change_next_instruction(self.action) mcts_logger.log("MCTS", f"Saving new role: {role.node_id}") role.save_state(static_save=True) - + async def expand(self, max_children): if self.is_fully_expanded(): return insight_geneartor = InstructionGenerator() role = self.load_role() original_instruction = role.get_next_instruction() - insights = await insight_geneartor.generate_new_instructions(task_id=role.start_task_id + 1, - original_instruction=original_instruction, - max_num=max_children, - file_path=self.state["exp_pool_path"]) + insights = await insight_geneartor.generate_new_instructions( + task_id=role.start_task_id + 1, + original_instruction=original_instruction, + max_num=max_children, + file_path=self.state["exp_pool_path"], + ) new_state = self.state.copy() - new_state['start_task_id'] += 1 + new_state["start_task_id"] += 1 for insight in insights: new_role = role.model_copy() node = Node(parent=self, state=new_state, action=insight, value=0) node.save_new_role(new_role) self.add_child(node) - + # def evaluate_test(self): # prediction_fpath = os.path.join(self.state["work_dir"], self.state["task"], "predictions.csv") # predictions = pd.read_csv(prediction_fpath)["target"] @@ -168,7 +175,7 @@ class Node(): # gt = pd.read_csv(os.path.join(split_datasets_dir["test_target"]))["target"] # metric = self.state["dataset_config"]["metric"] # return evaluate_score(predictions, gt, metric) - + def evaluate_prediction(self, split): pred_path = os.path.join(self.state["work_dir"], self.state["task"], f"{split}_predictions.csv") pred_node_path = os.path.join(self.state["node_dir"], f"Node-{self.id}-{split}_predictions.csv") @@ -177,28 +184,38 @@ class Node(): preds.to_csv(pred_node_path, index=False) gt = pd.read_csv(gt_path)["target"] metric = self.state["dataset_config"]["metric"] + # remove original predictions.csv + os.remove(pred_path) return evaluate_score(preds, gt, metric) - + def evaluate_simulation(self, score_dict): - scores = { - "dev_score": self.evaluate_prediction("dev"), - "test_score": self.evaluate_prediction("test") - } + scores = {"dev_score": self.evaluate_prediction("dev"), "test_score": self.evaluate_prediction("test")} score_dict.update(scores) return score_dict - - + async def run_node(self, role=None): if self.is_terminal() and role is not None: if role.state_saved: return self.raw_reward - - if not role: - role = self.load_role() - await load_execute_notebook(role) # execute previous notebook's code - await role.run(with_message='continue') - else: - await role.run(with_message=self.state['requirement']) + + max_retries = 3 + num_runs = 1 + run_finished = False + while num_runs <= max_retries and not run_finished: + try: + if not role: + role = self.load_role() + await load_execute_notebook(role) # execute previous notebook's code + await role.run(with_message="continue") + else: + await role.run(with_message=self.state["requirement"]) + run_finished = True + except Exception as e: + mcts_logger.log("MCTS", f"Error in running the role: {e}") + num_runs += 1 + if not run_finished: + mcts_logger.log("MCTS", f"Role {role.node_id} failed to run") + return {"test_score": 0, "dev_score": 0, "score": 0} score_dict = await role.get_score() score_dict = self.evaluate_simulation(score_dict) self.raw_reward = score_dict @@ -208,18 +225,19 @@ class Node(): if score == -1: return 0 return 1 / (1 + score) + score_dict = {k: normalize_score(v) for k, v in score_dict.items()} self.normalized_reward = score_dict return score_dict - -class MCTS(): - #data_path - root_node : Node = None - children : dict = {} - max_depth : int = 5 - c_explore : float = 1.4 - c_unvisited : float = 0.8 + +class MCTS: + # data_path + root_node: Node = None + children: dict = {} + max_depth: int = 5 + c_explore: float = 1.4 + c_unvisited: float = 0.8 def __init__(self, root_node, max_depth): self.root_node = root_node @@ -229,34 +247,34 @@ class MCTS(): node = self.best_child() mcts_logger.log("MCTS", f"Selected node id: {node.id}") return node - + def best_child(self): def uct(node: Node): n_visits = node.visited if node.visited else self.c_unvisited - avg_value = node.avg_value() if node.visited else node.value/self.c_unvisited + avg_value = node.avg_value() if node.visited else node.value / self.c_unvisited return avg_value + self.c_explore * math.sqrt(math.log(node.parent.visited) / n_visits) + if len(self.children) == 0: return self.root_node all_children = [child for children in self.children.values() for child in children] return max(all_children, key=uct) - async def expand(self, node : Node, max_children=4): + async def expand(self, node: Node, max_children=5): await node.expand(max_children) if node not in self.children or not self.children[node]: self.children[node] = node.children return node.children - - async def simulate(self, node : Node, role=None): + + async def simulate(self, node: Node, role=None): "Returns the reward for a random simulation (to completion) of `node`" mcts_logger.log("MCTS", f"Start simulating node {node.id}:") while node.children: node = random.choice(node.children) reward = await node.run_node(role) - mcts_logger.log("MCTS", f"Simulated node's reward: {reward}") + mcts_logger.log("MCTS", f"Simulated node's reward: {reward}") return reward - - def backpropagate(self, node : Node, reward): + def backpropagate(self, node: Node, reward): child_node = node node.update(reward) node = node.parent @@ -264,34 +282,35 @@ class MCTS(): node.update(reward, child_node) node, child_node = node.parent, node - def best_path(self, root : Node): + def best_path(self, root: Node): best_child = root best_score = 0 - def bfs(node : Node, best_score, best_child : Node, split): + + def bfs(node: Node, best_score, best_child: Node, split): assert split in ["test_score", "dev_score"] if node not in self.children: return best_score, best_child for child in self.children[node]: score = child.normalized_reward[split] - print(child.id, score) + print(child.id, split, score) if score > best_score: best_score = score best_child = child best_score, best_child = bfs(child, best_score, best_child, split) return best_score, best_child + _, best_child = bfs(root, best_score, best_child, "test_score") _, dev_best_child = bfs(root, best_score, best_child, "dev_score") - return {"dev_best": dev_best_child, - "global_best": best_child} - + return {"dev_best": dev_best_child, "global_best": best_child} + def get_num_simulations(self): return self.root_node.visited - async def search(self, task, data_config, name, - rollouts, load_tree=False, low_is_better=False, reflection=False): - - role, root = initialize_di_root_node(task, data_config, low_is_better=low_is_better, reflection=reflection, name=name) + async def search(self, task, data_config, name, rollouts, load_tree=False, low_is_better=False, reflection=False): + role, root = initialize_di_root_node( + task, data_config, low_is_better=low_is_better, reflection=reflection, name=name + ) self.root_node = root tree_loaded = False if load_tree: @@ -300,14 +319,14 @@ class MCTS(): mcts_logger.log("MCTS", f"Tree loaded: {tree_loaded}") if not tree_loaded: - rollouts -= 2 # 2 rollouts for the initial tree + rollouts -= 2 # 2 rollouts for the initial tree if rollouts < 0: raise ValueError("Rollouts must be greater than 2 if there is no tree to load") self.children[root] = [] reward = await self.simulate(root, role) self.backpropagate(root, reward) children = await self.expand(root) - #目前是随机选择1个,后续可以改成多个 + # 目前是随机选择1个,后续可以改成多个 first_leaf = random.choice(children) reward = await self.simulate(first_leaf) self.backpropagate(first_leaf, reward) @@ -325,14 +344,13 @@ class MCTS(): mcts_logger.log("MCTS", f"Terminal node's reward: {reward}") self.backpropagate(node, reward) else: - if node.visited > 0: + if node.visited > 0: children = await self.expand(node) node = random.choice(children) reward = await self.simulate(node) self.backpropagate(node, reward) return self.best_path(root) - def load_tree(self): def load_children_node(node): mcts_logger.log("MCTS", f"Load node {node.id}'s child: {node.children}") @@ -342,14 +360,15 @@ class MCTS(): child.load_node() self.children[child] = child.children load_children_node(child) + # Load all pkl files in the node_dir all_pkl_files = os.listdir(self.root_node.state["node_dir"]) all_pkl_files = [f for f in all_pkl_files if f.endswith(".pkl")] if os.path.exists(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl")): - with open(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl"), 'rb') as f: + with open(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl"), "rb") as f: self.root_node = pickle.load(f) self.children[self.root_node] = self.root_node.children load_children_node(self.root_node) if self.children: return True - return False \ No newline at end of file + return False diff --git a/expo/README.md b/expo/README.md index 2ecf2fd2f..4cc4daf25 100644 --- a/expo/README.md +++ b/expo/README.md @@ -35,18 +35,15 @@ ### Budget ### 提示词使用 -通过执行`dataset.py`中的`generate_task_requirement`函数获取提示词 +- 通过执行`dataset.py`中的`generate_task_requirement`函数获取提示词 +- 每一个数据集里有`dataset_info.json`,里面的内容需要提供给baselines以保证公平 ## 3. Evaluation 运行各个框架,运行后框架需要提供Dev和Test的`dev_predictions.csv`和`test_predictions.csv`, column name为target -两种评估方式 - -1. `evaluation.py` 提供pred和原始的gt(1D iterable)以及需要使用的metric,返回evaluation score - -2. 使用`CustomExperimenter` +- 使用`CustomExperimenter` ``` experimenter = CustomExperimenter(task="titanic") score_dict = experimenter.evaluate_pred_files(dev_pred_path, test_pred_path) @@ -61,12 +58,24 @@ ### AIDE 提供github链接,并说明使用的命令以及参数设置 ### Autogluon +#### Setup +``` +pip install -U pip +pip install -U setuptools wheel + +CPU version of pytorch has smaller footprint - see installation instructions in +pytorch documentation - https://pytorch.org/get-started/locally/ +pip install torch==2.3.1 torchvision==0.18.1 --index-url https://download.pytorch.org/whl/cpu + +pip install autogluon +``` + 提供github链接,并说明使用的命令以及参数设置 ### Base DI For setup, check 5. -- `python run_experiment.py --exp_mode base --task titanic` +- `python run_experiment.py --exp_mode base --task titanic --num_experiments 10` ### DI RandomSearch diff --git a/expo/dataset.py b/expo/dataset.py index 62665d297..fee1199a9 100644 --- a/expo/dataset.py +++ b/expo/dataset.py @@ -1,12 +1,14 @@ -import openml -from pathlib import Path -from sklearn.model_selection import train_test_split -import os -import json -import yaml -import pandas as pd -from expo.insights.solution_designer import SolutionDesigner import asyncio +import json +import os +from pathlib import Path + +import openml +import pandas as pd +import yaml +from sklearn.model_selection import train_test_split + +from expo.insights.solution_designer import SolutionDesigner BASE_USER_REQUIREMENT = """\ This is a {datasetname} dataset. Your goal is to predict the target column `{target_col}`. @@ -20,7 +22,7 @@ TASK_PROMPT = """\ **Attention** 1. Please do not leak the target label in any form during training. 2. Dev and Test sets do not have the target column. -3. You should perform transformations on all sets at the same step. +3. You should perform transformations on train, dev, and test sets at the same time (it's a good idea to define functions for this and avoid code repetition). 4. If labels are transformed during training, they should be transformed back to the original format before saving the predictions. ## Saving Dev and Test Predictions @@ -38,9 +40,9 @@ print("Train score:", train_score) ``` # Data dir -training: {train_path} -dev: {dev_path} -testing: {test_path} +training (with labels): {train_path} +dev (without labels): {dev_path} +testing (without labels): {test_path} # Output dir {output_dir} @@ -59,14 +61,12 @@ OPENML_DATASET_IDS = [ 41980, 42225, 531, - # cls 41143, 31, 42733, 41162, 1067, - # multi cls 40498, 40982, @@ -79,14 +79,15 @@ CUSTOM_DATASETS = [ ("04_titanic", "Survived"), ("05_house-prices-advanced-regression-techniques", "SalePrice"), ("06_santander-customer-transaction-prediction", "target"), - ("07_icr-identify-age-related-conditions", "Class") + ("07_icr-identify-age-related-conditions", "Class"), ] + def get_split_dataset_path(dataset_name, config): - datasets_dir = config['datasets_dir'] - if dataset_name in config['datasets']: - dataset = config['datasets'][dataset_name] - data_path = os.path.join(datasets_dir, dataset['dataset']) + datasets_dir = config["datasets_dir"] + if dataset_name in config["datasets"]: + dataset = config["datasets"][dataset_name] + data_path = os.path.join(datasets_dir, dataset["dataset"]) split_datasets = { "train": os.path.join(data_path, "split_train.csv"), "dev": os.path.join(data_path, "split_dev.csv"), @@ -98,32 +99,39 @@ def get_split_dataset_path(dataset_name, config): } return split_datasets else: - raise ValueError(f"Dataset {dataset_name} not found in config file. Available datasets: {config['datasets'].keys()}") + raise ValueError( + f"Dataset {dataset_name} not found in config file. Available datasets: {config['datasets'].keys()}" + ) + def get_user_requirement(task_name, config): - datasets_dir = config['datasets_dir'] - if task_name in config['datasets']: - dataset = config['datasets'][task_name] - data_path = os.path.join(datasets_dir, dataset['dataset']) - user_requirement = dataset['user_requirement'] + datasets_dir = config["datasets_dir"] + if task_name in config["datasets"]: + dataset = config["datasets"][task_name] + data_path = os.path.join(datasets_dir, dataset["dataset"]) + user_requirement = dataset["user_requirement"] return data_path, user_requirement else: - raise ValueError(f"Dataset {task_name} not found in config file. Available datasets: {config['datasets'].keys()}") + raise ValueError( + f"Dataset {task_name} not found in config file. Available datasets: {config['datasets'].keys()}" + ) def save_datasets_dict_to_yaml(datasets_dict): with open("datasets.yaml", "w") as file: yaml.dump(datasets_dict, file) + def create_dataset_dict(dataset): dataset_dict = { "dataset": dataset.name, "user_requirement": dataset.create_base_requirement(), "metric": dataset.get_metric(), - "target_col": dataset.target_col + "target_col": dataset.target_col, } return dataset_dict + def generate_task_requirement(task_name, data_config): user_requirement = get_user_requirement(task_name, data_config) split_dataset_path = get_split_dataset_path(task_name, data_config) @@ -132,19 +140,23 @@ def generate_task_requirement(task_name, data_config): test_path = split_dataset_path["test_wo_target"] work_dir = data_config["work_dir"] output_dir = f"{work_dir}/{task_name}" - user_requirement = TASK_PROMPT.format(user_requirement=user_requirement, - train_path=train_path, dev_path=dev_path, test_path=test_path, - output_dir=output_dir) + user_requirement = TASK_PROMPT.format( + user_requirement=user_requirement, + train_path=train_path, + dev_path=dev_path, + test_path=test_path, + output_dir=output_dir, + ) print(user_requirement) return user_requirement class ExpDataset: - description : str = None - metadata : dict = None - dataset_dir : str = None - target_col : str = None - name : str = None + description: str = None + metadata: dict = None + dataset_dir: str = None + target_col: str = None + name: str = None def __init__(self, name, dataset_dir, **kwargs): self.name = name @@ -154,18 +166,23 @@ class ExpDataset: self.save_dataset(target_col=self.target_col) def check_dataset_exists(self): - fnames = ["split_train.csv", "split_dev.csv", "split_test.csv", - "split_dev_wo_target.csv", "split_dev_target.csv", - "split_test_wo_target.csv", "split_test_target.csv"] + fnames = [ + "split_train.csv", + "split_dev.csv", + "split_test.csv", + "split_dev_wo_target.csv", + "split_dev_target.csv", + "split_test_wo_target.csv", + "split_test_target.csv", + ] for fname in fnames: if not os.path.exists(Path(self.dataset_dir, self.name, fname)): return False return True - + def check_datasetinfo_exists(self): return os.path.exists(Path(self.dataset_dir, self.name, "dataset_info.json")) - def get_raw_dataset(self): raw_dir = Path(self.dataset_dir, self.name, "raw") if not os.path.exists(Path(raw_dir, "train.csv")): @@ -173,17 +190,17 @@ class ExpDataset: else: df = pd.read_csv(Path(raw_dir, "train.csv")) return df - + def get_dataset_info(self): raw_df = pd.read_csv(Path(self.dataset_dir, self.name, "raw", "train.csv")) metadata = { - 'NumberOfClasses': raw_df[self.target_col].nunique(), - 'NumberOfFeatures': raw_df.shape[1], - 'NumberOfInstances': raw_df.shape[0], - 'NumberOfInstancesWithMissingValues': int(raw_df.isnull().any(axis=1).sum()), - 'NumberOfMissingValues': int(raw_df.isnull().sum().sum()), - 'NumberOfNumericFeatures': raw_df.select_dtypes(include=['number']).shape[1], - 'NumberOfSymbolicFeatures': raw_df.select_dtypes(include=['object']).shape[1], + "NumberOfClasses": raw_df[self.target_col].nunique(), + "NumberOfFeatures": raw_df.shape[1], + "NumberOfInstances": raw_df.shape[0], + "NumberOfInstancesWithMissingValues": int(raw_df.isnull().any(axis=1).sum()), + "NumberOfMissingValues": int(raw_df.isnull().sum().sum()), + "NumberOfNumericFeatures": raw_df.select_dtypes(include=["number"]).shape[1], + "NumberOfSymbolicFeatures": raw_df.select_dtypes(include=["object"]).shape[1], } df_head_text = raw_df.head().to_string(index=False) @@ -193,10 +210,10 @@ class ExpDataset: "description": "", "target_col": self.target_col, "metadata": metadata, - "df_head": df_head_text + "df_head": df_head_text, } return dataset_info - + def get_metric(self): dataset_info = self.get_dataset_info() num_classes = dataset_info["metadata"]["NumberOfClasses"] @@ -216,7 +233,6 @@ class ExpDataset: return req def save_dataset(self, target_col): - df = self.get_raw_dataset() if not self.check_dataset_exists() or self.force_update: print(f"Saving Dataset {self.name} in {self.dataset_dir}") @@ -249,25 +265,22 @@ class ExpDataset: def split_and_save(self, df, target_col): if not target_col: raise ValueError("Target column not provided") - train, test = train_test_split(df, test_size=1-TRAIN_TEST_SPLIT, random_state=SEED) - train, dev = train_test_split(train, test_size=1-TRAIN_DEV_SPLIT, random_state=SEED) + train, test = train_test_split(df, test_size=1 - TRAIN_TEST_SPLIT, random_state=SEED) + train, dev = train_test_split(train, test_size=1 - TRAIN_DEV_SPLIT, random_state=SEED) self.save_split_datasets(train, "train") self.save_split_datasets(dev, "dev", target_col) self.save_split_datasets(test, "test", target_col) - - + class OpenMLExpDataset(ExpDataset): def __init__(self, name, dataset_dir, dataset_id, **kwargs): self.dataset_id = dataset_id - self.dataset = openml.datasets.get_dataset(self.dataset_id, - download_data=False, - download_qualities=False, - download_features_meta_data=True) + self.dataset = openml.datasets.get_dataset( + self.dataset_id, download_data=False, download_qualities=False, download_features_meta_data=True + ) self.name = self.dataset.name self.target_col = self.dataset.default_target_attribute super().__init__(self.name, dataset_dir, target_col=self.target_col, **kwargs) - def get_raw_dataset(self): dataset = self.dataset @@ -276,7 +289,7 @@ class OpenMLExpDataset(ExpDataset): os.makedirs(raw_dir, exist_ok=True) dataset_df.to_csv(Path(raw_dir, "train.csv"), index=False) return dataset_df - + def get_dataset_info(self): dataset_info = super().get_dataset_info() dataset = self.dataset @@ -290,12 +303,14 @@ class OpenMLExpDataset(ExpDataset): # def __init__(self, name, dataset_dir, dataset_name, **kwargs): # super().__init__(name, dataset_dir, **kwargs) + async def process_dataset(dataset, solution_designer, save_analysis_pool, datasets_dict): if save_analysis_pool: asyncio.run(solution_designer.generate_solutions(dataset.get_dataset_info(), dataset.name)) dataset_dict = create_dataset_dict(dataset) datasets_dict["datasets"][dataset.name] = dataset_dict + if __name__ == "__main__": datasets_dir = "D:/work/automl/datasets" force_update = False diff --git a/expo/evaluation/evaluation.py b/expo/evaluation/evaluation.py index 20a35aa27..16b3acb71 100644 --- a/expo/evaluation/evaluation.py +++ b/expo/evaluation/evaluation.py @@ -1,11 +1,12 @@ -from sklearn.metrics import f1_score, accuracy_score, roc_auc_score, mean_squared_error import numpy as np +from sklearn.metrics import accuracy_score, f1_score, mean_squared_error, roc_auc_score + def evaluate_score(pred, gt, metric): if metric == "accuracy": return accuracy_score(gt, pred) elif metric == "f1": - unique_classes = np.unique(gt) + unique_classes = sorted(list(np.unique(gt))) if 1 in unique_classes and 0 in unique_classes: pos_label = 1 else: @@ -20,4 +21,4 @@ def evaluate_score(pred, gt, metric): elif metric == "log rmse": return mean_squared_error(np.log1p(gt), np.log1p(pred), squared=False) else: - raise ValueError(f"Metric {metric} not supported") \ No newline at end of file + raise ValueError(f"Metric {metric} not supported") diff --git a/expo/evaluation/visualize_mcts.py b/expo/evaluation/visualize_mcts.py index 6e38576e2..d310036c0 100644 --- a/expo/evaluation/visualize_mcts.py +++ b/expo/evaluation/visualize_mcts.py @@ -1,7 +1,7 @@ - -from expo.MCTS import Node, MCTS import textwrap +from expo.MCTS import Node + NODE_TEMPLATE = """\ [Node {id}] Plans: @@ -11,21 +11,23 @@ Score: {score}, Visits: {num_visits} """ + def get_role_plans(role): plans = role.planner.plan.tasks instruct_plans = [f"{i+1}. {task.instruction}" for i, task in enumerate(plans)] return instruct_plans -def get_tree_text(node : Node): +def get_tree_text(node: Node): role_dict = {} code_set = set() + def load_role(node): if node.id not in role_dict: role_dict[node.id] = node.load_role() return role_dict[node.id] - - def visualize_node(node : Node, previous_plans=None): + + def visualize_node(node: Node, previous_plans=None): role = load_role(node) node_id = node.id plans = role.planner.plan.tasks @@ -36,7 +38,9 @@ def get_tree_text(node : Node): simulated = role.state_saved score = f"avg score: {node.avg_value()}, simulated score: {node.raw_reward}" num_visits = node.visited - return NODE_TEMPLATE.format(id=node_id, plans=instruct_plans_text, simulated=simulated, score=score, num_visits=num_visits) + return NODE_TEMPLATE.format( + id=node_id, plans=instruct_plans_text, simulated=simulated, score=score, num_visits=num_visits + ) def visualize_tree(node, depth=0, previous_plans=None): text = "" @@ -46,9 +50,10 @@ def get_tree_text(node : Node): code_set.update({task.instruction for task in role.planner.plan.tasks}) previous_plans = get_role_plans(role) for child in node.children: - text += textwrap.indent(visualize_tree(child, depth+1, previous_plans), "\t") + text += textwrap.indent(visualize_tree(child, depth + 1, previous_plans), "\t") return text - - return visualize_tree(node), len(code_set) - + num_simulations = node.visited + text = f"Number of simulations: {num_simulations}\n" + text += visualize_tree(node) + return text, len(code_set) diff --git a/expo/experimenter/__init__.py b/expo/experimenter/__init__.py index 2eab295f7..e69de29bb 100644 --- a/expo/experimenter/__init__.py +++ b/expo/experimenter/__init__.py @@ -1,4 +0,0 @@ -from .experimenter import Experimenter -from .mcts import MCTSExperimenter -from .aug import AugExperimenter -from .custom import CustomExperimenter \ No newline at end of file diff --git a/expo/experimenter/aug.py b/expo/experimenter/aug.py index 956849717..9b14123d3 100644 --- a/expo/experimenter/aug.py +++ b/expo/experimenter/aug.py @@ -1,9 +1,8 @@ from experimenter import Experimenter -from expo.MCTS import create_initial_state -from expo.dataset import generate_task_requirement -from expo.utils import mcts_logger, load_execute_notebook, get_exp_pool_path + from expo.insights.instruction_generator import InstructionGenerator from expo.research_assistant import ResearchAssistant +from expo.utils import get_exp_pool_path EXPS_PROMPT = """ When doing the tasks, you can refer to the insights below: @@ -12,14 +11,12 @@ When doing the tasks, you can refer to the insights below: """ - - class AugExperimenter(Experimenter): - result_path : str = "results/aug" + result_path: str = "results/aug" async def run_experiment(self): - state = create_initial_state(self.args.task, start_task_id=1, data_config=self.data_config, low_is_better=self.args.low_is_better, name="") - user_requirement = state["requirement"] + # state = create_initial_state(self.args.task, start_task_id=1, data_config=self.data_config, low_is_better=self.args.low_is_better, name="") + user_requirement = self.state["requirement"] exp_pool_path = get_exp_pool_path(self.args.task, self.data_config, pool_name="ds_analysis_pool") exp_pool = InstructionGenerator.load_analysis_pool(exp_pool_path) if self.args.aug_mode == "single": @@ -31,30 +28,27 @@ class AugExperimenter(Experimenter): exps = [exp_set_text] * self.args.num_experiments else: raise ValueError(f"Invalid mode: {self.args.aug_mode}") - + results = [] for i in range(self.args.num_experiments): di = ResearchAssistant(node_id=str(i), use_reflection=self.args.reflection) di.role_dir = f"{di.role_dir}_{self.args.task}" requirement = user_requirement + EXPS_PROMPT.format(experience=exps[i]) print(requirement) - await di.run(requirement) - score_dict = await di.get_score() - score_dict = self.evaluate(score_dict, state) - results.append({ - "idx": i, - "score_dict": score_dict, - "aug_mode": self.args.aug_mode, - "insights" : exps[i], - "user_requirement": requirement, - "args": vars(self.args) - }) + score_dict = await self.run_di(di, requirement) + results.append( + { + "idx": i, + "score_dict": score_dict, + "aug_mode": self.args.aug_mode, + "insights": exps[i], + "user_requirement": requirement, + "args": vars(self.args), + } + ) scores = [result["score_dict"]["test_score"] for result in results] avg_score = sum(scores) / len(scores) best_score = max(scores) if not self.args.low_is_better else min(scores) best_score_idx = scores.index(best_score) results.insert(0, {"avg_score": avg_score, "best_score": best_score, "best_score_idx": best_score_idx}) self.save_result(results) - - - \ No newline at end of file diff --git a/expo/experimenter/autogluon.py b/expo/experimenter/autogluon.py new file mode 100644 index 000000000..4f5d151ef --- /dev/null +++ b/expo/experimenter/autogluon.py @@ -0,0 +1,33 @@ +from autogluon.tabular import TabularDataset, TabularPredictor + +from expo.experimenter.custom import CustomExperimenter + + +class AGRunner: + preset = "best_quality" + time_limit = 500 + + def __init__(self, datasets): + self.datasets = datasets + + def run(self): + train_path = self.datasets["train"] + test_wo_target_path = self.datasets["test_wo_target"] + dev_wo_target_path = self.datasets["dev_wo_target"] + target_col = self.state["dataset_config"]["target_col"] + train_data = TabularDataset(train_path) + test_data = TabularDataset(test_wo_target_path) + dev_data = TabularDataset(dev_wo_target_path) + + predictor = TabularPredictor(label=target_col).fit(train_data, presets=self.preset, time_limit=self.time_limit) + test_preds = predictor.predict(test_data) + dev_preds = predictor.predict(dev_data) + return {"test_preds": test_preds, "dev_preds": dev_preds} + + +class GluonExperimenter(CustomExperimenter): + result_path: str = "results/autogluon" + + def __init__(self, args, **kwargs): + super().__init__(args, **kwargs) + self.framework = AGRunner(self.datasets) diff --git a/expo/experimenter/custom.py b/expo/experimenter/custom.py index ff5ba3546..ba009bdb0 100644 --- a/expo/experimenter/custom.py +++ b/expo/experimenter/custom.py @@ -1,35 +1,36 @@ -from expo.experimenter import Experimenter -from expo.MCTS import create_initial_state -from expo.evaluation.evaluation import evaluate_score -import pandas as pd import os +import pandas as pd + +from expo.evaluation.evaluation import evaluate_score +from expo.experimenter import Experimenter +from expo.MCTS import create_initial_state + + class CustomExperimenter(Experimenter): - result_path : str = "results/custom" - + result_path: str = "results/custom" + def __init__(self, args, **kwargs): super().__init__(args, **kwargs) - self.framework = kwargs["framework"] # todo + self.framework = kwargs["framework"] # todo self.task = kwargs.get("task", self.args.task) self.low_is_better = kwargs.get("low_is_better", self.args.low_is_better) self.name = kwargs.get("name", "") self.result_path = f"results/custom_{self.name}" - self.state = create_initial_state(self.task, start_task_id=1, data_config=self.data_config, low_is_better=self.low_is_better, name=self.name) - - async def run_experiment(self): + self.state = create_initial_state( + self.task, start_task_id=1, data_config=self.data_config, low_is_better=self.low_is_better, name=self.name + ) + + def run_experiment(self): user_requirement = self.state["requirement"] - preds = await self.framework.run(user_requirement) + preds = self.framework.run(user_requirement) test_preds = preds["test_preds"] dev_preds = preds["dev_preds"] score_dict = { "dev_score": self.evaluate_predictions(dev_preds, "dev"), - "test_score": self.evaluate_predictions(test_preds, "test") - } - results = { - "score_dict": score_dict, - "user_requirement": user_requirement, - "args": vars(self.args) + "test_score": self.evaluate_predictions(test_preds, "test"), } + results = {"score_dict": score_dict, "user_requirement": user_requirement, "args": vars(self.args)} self.save_result(results) def evaluate_pred_files(self, dev_pred_path, test_pred_path): @@ -37,7 +38,7 @@ class CustomExperimenter(Experimenter): test_preds = pd.read_csv(test_pred_path)["target"] score_dict = { "dev_score": self.evaluate_score(dev_preds, "dev"), - "test_score": self.evaluate_score(test_preds, "test") + "test_score": self.evaluate_score(test_preds, "test"), } return score_dict @@ -46,8 +47,7 @@ class CustomExperimenter(Experimenter): gt_path = os.path.join(self.state["datasets_dir"][f"{split}_target"]) gt = pd.read_csv(gt_path)["target"] score = evaluate_score(preds, gt, metric) - return score - + return score def load_datasets(self): train_path = self.state["datasets_dir"]["train"] @@ -57,4 +57,3 @@ class CustomExperimenter(Experimenter): dev = pd.read_csv(dev_path) test = pd.read_csv(test_path) return train, dev, test - diff --git a/expo/experimenter/experimenter.py b/expo/experimenter/experimenter.py index 678d48d6a..83dde80b9 100644 --- a/expo/experimenter/experimenter.py +++ b/expo/experimenter/experimenter.py @@ -1,43 +1,77 @@ -from expo.utils import DATA_CONFIG -import os -import pandas as pd -from expo.evaluation.evaluation import evaluate_score import datetime import json +import os + +import pandas as pd + +from expo.evaluation.evaluation import evaluate_score from expo.MCTS import create_initial_state from expo.research_assistant import ResearchAssistant +from expo.utils import DATA_CONFIG class Experimenter: - result_path : str = "results/base" + result_path: str = "results/base" data_config = DATA_CONFIG - def __init__(self, args, **kwargs): self.args = args self.start_time = datetime.datetime.now().strftime("%Y%m%d%H%M") + self.state = create_initial_state( + self.args.task, + start_task_id=1, + data_config=self.data_config, + low_is_better=self.args.low_is_better, + name="", + ) + + async def run_di(self, di, user_requirement): + max_retries = 3 + num_runs = 1 + run_finished = False + while num_runs <= max_retries and not run_finished: + try: + await di.run(user_requirement) + score_dict = await di.get_score() + score_dict = self.evaluate(score_dict, self.state) + run_finished = True + except Exception as e: + print(f"Error: {e}") + num_runs += 1 + if not run_finished: + score_dict = {"train_score": -1, "dev_score": -1, "test_score": -1, "score": -1} + return score_dict async def run_experiment(self): - state = create_initial_state(self.args.task, start_task_id=1, data_config=self.data_config, low_is_better=self.args.low_is_better, name="") + state = self.state user_requirement = state["requirement"] results = [] for i in range(self.args.num_experiments): di = ResearchAssistant(node_id="0", use_reflection=self.args.reflection) - await di.run(user_requirement) - score_dict = await di.get_score() - score_dict = self.evaluate(score_dict, state) - results.append({ - "idx": i, - "score_dict": score_dict, - "user_requirement": user_requirement, - "args": vars(self.args) - }) - scores = [result["score_dict"]["test_score"] for result in results] - avg_score = sum(scores) / len(scores) - best_score = max(scores) if not self.args.low_is_better else min(scores) - best_score_idx = scores.index(best_score) - results.insert(0, {"avg_score": avg_score, "best_score": best_score, "best_score_idx": best_score_idx}) + score_dict = await self.run_di(di, user_requirement) + results.append( + {"idx": i, "score_dict": score_dict, "user_requirement": user_requirement, "args": vars(self.args)} + ) + self.save_result(results) # save intermediate results + dev_scores = [result["score_dict"]["dev_score"] for result in results] + best_dev_score = max(dev_scores) if not self.args.low_is_better else min(dev_scores) + best_score_idx = dev_scores.index(best_dev_score) + + test_scores = [result["score_dict"]["test_score"] for result in results] + avg_score = sum(test_scores) / len(test_scores) + global_best_score = max(test_scores) if not self.args.low_is_better else min(test_scores) + + results.insert( + 0, + { + "best_dev_score": best_dev_score, + "best_score_idx": best_score_idx, + "best_test_score": test_scores[best_score_idx], + "avg_test_score": avg_score, + "best_score": global_best_score, + }, + ) self.save_result(results) def evaluate_prediction(self, split, state): @@ -49,8 +83,9 @@ class Experimenter: preds.to_csv(pred_node_path, index=False) gt = pd.read_csv(gt_path)["target"] metric = state["dataset_config"]["metric"] + os.remove(pred_path) return evaluate_score(preds, gt, metric) - + def evaluate(self, score_dict, state): scores = { "dev_score": self.evaluate_prediction("dev", state), @@ -59,8 +94,15 @@ class Experimenter: score_dict.update(scores) return score_dict - def save_result(self, result): + end_time = datetime.datetime.now().strftime("%Y%m%d%H%M") + time_info = { + "start_time": self.start_time, + "end_time": end_time, + "duration (minutes)": float(end_time) - float(self.start_time), + } + result = result.copy() + result.insert(0, time_info) os.makedirs(self.result_path, exist_ok=True) with open(f"{self.result_path}/{self.args.exp_mode}-{self.args.task}_{self.start_time}.json", "w") as f: json.dump(result, f, indent=4) diff --git a/expo/experimenter/mcts.py b/expo/experimenter/mcts.py index 0159abe24..921b81412 100644 --- a/expo/experimenter/mcts.py +++ b/expo/experimenter/mcts.py @@ -1,45 +1,47 @@ -from expo.experimenter import Experimenter -from expo.dataset import generate_task_requirement -from expo.MCTS import MCTS from expo.evaluation.visualize_mcts import get_tree_text +from expo.experimenter import Experimenter +from expo.MCTS import MCTS class MCTSExperimenter(Experimenter): - result_path : str = "results/mcts" + result_path: str = "results/mcts" + async def run_experiment(self): mcts = MCTS(root_node=None, max_depth=5) - best_nodes = await mcts.search(self.args.task, self.data_config, - low_is_better=self.args.low_is_better, - load_tree=self.args.load_tree, - reflection=self.args.reflection, - rollouts=self.args.rollouts, - name=self.args.name) + best_nodes = await mcts.search( + self.args.task, + self.data_config, + low_is_better=self.args.low_is_better, + load_tree=self.args.load_tree, + reflection=self.args.reflection, + rollouts=self.args.rollouts, + name=self.args.name, + ) best_node = best_nodes["global_best"] dev_best_node = best_nodes["dev_best"] - + text, num_generated_codes = get_tree_text(mcts.root_node) text += f"Generated {num_generated_codes} unique codes.\n" text += f"Best node: {best_node}, score: {best_node.raw_reward}\n" text += f"Dev best node: {dev_best_node}, score: {dev_best_node.raw_reward}\n" print(text) - if self.args.rollouts > 0: - self.save_tree(text) + self.save_tree(text) - results = { + results = [ + { "best_node": best_node.id, "best_node_score": best_node.raw_reward, "dev_best_node": dev_best_node.id, "dev_best_node_score": dev_best_node.raw_reward, "num_generated_codes": num_generated_codes, "user_requirement": best_node.state["requirement"], - "args": vars(self.args) + "tree_text": text, + "args": vars(self.args), } - self.save_result(results) + ] + self.save_result(results) - - def save_tree(self, tree_text): fpath = f"{self.result_path}/{self.args.task}_tree_{self.args.name}.txt" with open(fpath, "w") as f: f.write(tree_text) - diff --git a/expo/insights/instruction_generator.py b/expo/insights/instruction_generator.py index 4f4155ff8..065565c89 100644 --- a/expo/insights/instruction_generator.py +++ b/expo/insights/instruction_generator.py @@ -1,3 +1,10 @@ +import json +import random + +from expo.utils import clean_json_from_rsp, load_data_config, mcts_logger +from metagpt.llm import LLM +from metagpt.schema import Message + REFLECTION_SYSTEM_MSG = "As a Kaggle grandmaster participating in a competition, you need to analyze your experience and propose evolutionary points that are more likely to improve the performance of baseline code." CHANGE_INSTRUCTION = """ @@ -18,12 +25,6 @@ Rewrite the original instruction according to the insights ``` """ -import re -import random -import json -from metagpt.llm import LLM -from metagpt.schema import Message -from expo.utils import load_data_config, mcts_logger, clean_json_from_rsp DATA_CONFIG = load_data_config() @@ -31,7 +32,7 @@ class InstructionGenerator: data_config = DATA_CONFIG @staticmethod - def load_json_data(json_dir): + def load_json_data(json_dir): with open(json_dir, "r") as file: json_data = json.load(file) return json_data @@ -39,7 +40,7 @@ class InstructionGenerator: @staticmethod def _random_sample(analysis, num_samples): return random.sample(analysis, num_samples) - + @staticmethod def sample_instruction_set(data): data_dict = {} @@ -52,12 +53,12 @@ class InstructionGenerator: for task_id in sorted(data_dict.keys()): instruction_set.append(random.choice(data_dict[task_id])) return instruction_set - + @staticmethod def format_output(rsp): rsp_list = [] - new_data = [] - rsp_list.append(rsp) + new_data = [] + rsp_list.append(rsp) for item in rsp_list: item_dict = json.loads(item) data = { @@ -83,21 +84,19 @@ class InstructionGenerator: new_instructions = [] if len(data) == 0: mcts_logger.log("MCTS", f"No insights available for task {task_id}") - return [original_instruction] # Return the original instruction if no insights are available + return [original_instruction] # Return the original instruction if no insights are available for item in data[:max_num]: insights = item["Analysis"] new_instruction = await InstructionGenerator.generate_new_instruction(original_instruction, insights) new_instructions.append(new_instruction) return new_instructions - + @staticmethod async def generate_new_instruction(original_instruction, insights): prompt = CHANGE_INSTRUCTION.format(instruction=original_instruction, insights=insights) llm = LLM() - context = llm.format_msg([Message(content=prompt, role="user")]) - llm_response = await llm.aask( - context, system_msgs=[REFLECTION_SYSTEM_MSG] - ) + context = llm.format_msg([Message(content=prompt, role="user")]) + llm_response = await llm.aask(context, system_msgs=[REFLECTION_SYSTEM_MSG]) rsp = clean_json_from_rsp(llm_response) new_instruction = json.loads(rsp)["New Instruction"] - return new_instruction \ No newline at end of file + return new_instruction diff --git a/expo/insights/solution_designer.py b/expo/insights/solution_designer.py index e2bf57ae3..fc05afeea 100644 --- a/expo/insights/solution_designer.py +++ b/expo/insights/solution_designer.py @@ -1,10 +1,7 @@ -import re -import random import json -from metagpt.llm import LLM -from metagpt.schema import Message -from expo.utils import clean_json_from_rsp, load_data_config +from expo.utils import clean_json_from_rsp, load_data_config +from metagpt.llm import LLM DATA_CONFIG = load_data_config() @@ -72,56 +69,50 @@ Make sure each method is independent and can be implemented separately. """ KEY_DATASET_FEATURES = [ - 'NumberOfClasses', - 'NumberOfFeatures', - 'NumberOfInstances', - 'NumberOfInstancesWithMissingValues', - 'NumberOfMissingValues', - 'NumberOfNumericFeatures', - 'NumberOfSymbolicFeatures' + "NumberOfClasses", + "NumberOfFeatures", + "NumberOfInstances", + "NumberOfInstancesWithMissingValues", + "NumberOfMissingValues", + "NumberOfNumericFeatures", + "NumberOfSymbolicFeatures", ] -TASK_TO_ID = { - "EDA": 1, - "Data Preprocessing": 2, - "Feature Engineering": 3, - "Model Training": 4, - "Model Evaluation": 5 -} +TASK_TO_ID = {"EDA": 1, "Data Preprocessing": 2, "Feature Engineering": 3, "Model Training": 4, "Model Evaluation": 5} + class SolutionDesigner: - data_dir : str= DATA_CONFIG["datasets_dir"] + data_dir: str = DATA_CONFIG["datasets_dir"] async def generate_solutions(self, dataset_info, dataset_name): llm = LLM() - context = DATASET_INSIGHT_PROMPT.format(dataset=dataset_info["description"], - metadata=self.metadata_builder(dataset_info["metadata"]), - head=dataset_info["df_head"]) + context = DATASET_INSIGHT_PROMPT.format( + dataset=dataset_info["description"], + metadata=self.metadata_builder(dataset_info["metadata"]), + head=dataset_info["df_head"], + ) rsp = await llm.aask(context) rsp = clean_json_from_rsp(rsp) analysis_pool = self.process_analysis_pool(json.loads(rsp)) dataset_path = f"{self.data_dir}/{dataset_name}" self.save_analysis_pool(dataset_path, analysis_pool) - - + def process_analysis_pool(self, insights_rsp): analysis_pool = [] for task_type_insights in insights_rsp: task_type = task_type_insights["task_type"] for insight in task_type_insights["insights"]: - analysis_pool.append({"Analysis": insight, "Category": task_type, "task_id": TASK_TO_ID[task_type]}) + analysis_pool.append({"Analysis": insight, "Category": task_type, "task_id": TASK_TO_ID[task_type]}) return analysis_pool - def metadata_builder(self, qualities): metadata = {} for key in KEY_DATASET_FEATURES: metadata[key] = qualities.get(key, "N/A") metadata_text = json.dumps(metadata, indent=4) return metadata_text - + def save_analysis_pool(self, dataset_path, analysis_pool): fpath = f"{dataset_path}/ds_analysis_pool.json" with open(fpath, "w") as file: json.dump(analysis_pool, file, indent=4) - \ No newline at end of file diff --git a/expo/research_assistant.py b/expo/research_assistant.py index ed935b4b8..b21fc1a55 100644 --- a/expo/research_assistant.py +++ b/expo/research_assistant.py @@ -1,19 +1,16 @@ from __future__ import annotations import json +import os + +from pydantic import model_validator + +from expo.utils import mcts_logger, save_notebook +from metagpt.actions.di.write_analysis_code import WriteAnalysisCode +from metagpt.const import SERDESER_PATH from metagpt.roles.di.data_interpreter import DataInterpreter from metagpt.schema import Message, Task, TaskResult -from metagpt.strategy.task_type import TaskType -from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender -from metagpt.utils.common import CodeParser -from metagpt.utils.common import write_json_file, read_json_file, format_trackback_info -from metagpt.const import MESSAGE_ROUTE_TO_ALL, SERDESER_PATH -from expo.utils import mcts_logger, save_notebook -from pydantic import Field, model_validator -from metagpt.actions.di.write_analysis_code import CheckData, WriteAnalysisCode - -import re -import os +from metagpt.utils.common import CodeParser, write_json_file EXTRACT_SCORE_PROMPT = """ # Code: @@ -36,39 +33,48 @@ If you cannot find the scores, please still return a dictionary with the keys 't ``` """ + class ResearchAssistant(DataInterpreter): node_id: str = "0" start_task_id: int = 1 - state_saved : bool = False - role_dir : str = SERDESER_PATH.joinpath("team", "environment", "roles", f"Experimenter") + state_saved: bool = False + role_dir: str = SERDESER_PATH.joinpath("team", "environment", "roles", "Experimenter") def get_node_name(self): return f"Node-{self.node_id}" - + def get_next_instruction(self): return self.planner.plan.tasks[self.start_task_id] - + def change_next_instruction(self, new_instruction): if new_instruction is not None: self.planner.plan.task_map[str(self.start_task_id)].instruction = new_instruction self.remap_tasks() - def update_til_start_task(self, role: ResearchAssistant, backward: bool = True): if backward: # make sure the previous task instructions are matched - assert self.start_task_id == role.start_task_id - 1, f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}" + assert ( + self.start_task_id == role.start_task_id - 1 + ), f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}" for i in range(self.start_task_id): - if self.planner.plan.task_map[str(self.start_task_id)].instruction != role.planner.plan.task_map[str(self.start_task_id)].instruction: + if ( + self.planner.plan.task_map[str(self.start_task_id)].instruction + != role.planner.plan.task_map[str(self.start_task_id)].instruction + ): mcts_logger.info("Previous task instructions not matched") self.remap_tasks() return # copy new role's task (self.start_task_id) to current role - self.planner.plan.task_map[str(self.start_task_id)] = role.planner.plan.task_map[str(self.start_task_id)].model_copy() + self.planner.plan.task_map[str(self.start_task_id)] = role.planner.plan.task_map[ + str(self.start_task_id) + ].model_copy() self.remap_tasks() else: - assert self.start_task_id == role.start_task_id + 1, f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}" + assert ( + self.start_task_id == role.start_task_id + 1 + ), f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}" if int(role.planner.plan.current_task_id) > self.start_task_id: for i in range(role.start_task_id): self.planner.plan.task_map[str(i)] = role.planner.plan.task_map[str(i)].model_copy() @@ -86,11 +92,10 @@ class ResearchAssistant(DataInterpreter): json_block = CodeParser.parse_code(block=None, text=rsp) score_dict = json.loads(json_block) return score_dict - @model_validator(mode="after") def set_plan_and_tool(self) -> "Interpreter": - if self.planner.plan.goal != '': + if self.planner.plan.goal != "": self.set_actions([WriteAnalysisCode]) self._set_state(0) print("Plan already exists, skipping initialization.") @@ -116,17 +121,17 @@ class ResearchAssistant(DataInterpreter): self.state_saved = True mcts_logger.log("MCTS", f"Saving state at task {self.start_task_id}") else: - mcts_logger.log("MCTS", f"Static Saving") + mcts_logger.log("MCTS", "Static Saving") stg_path = self.role_dir name = self.get_node_name() role_path = os.path.join(stg_path, f"{name}.json") # 将状态保存为 JSON 文件 write_json_file(role_path, self.model_dump()) - def remap_tasks(self): - self.planner.plan.tasks = [self.planner.plan.task_map[task_id] for task_id in sorted(self.planner.plan.task_map.keys())] - + self.planner.plan.tasks = [ + self.planner.plan.task_map[task_id] for task_id in sorted(self.planner.plan.task_map.keys()) + ] async def run(self, with_message=None) -> Message | None: """Observe, and think and act based on the results of the observation""" @@ -138,13 +143,9 @@ class ResearchAssistant(DataInterpreter): self.rc.working_memory.clear() self.working_memory.clear() # self.rc.todo = WriteAnalysisCode() - rsp = await self.react() + rsp = await self.react() # 发送响应消息给 Environment 对象,以便它将消息传递给订阅者 self.set_todo(None) self.publish_message(rsp) return rsp return await super().run(with_message) - - - - \ No newline at end of file diff --git a/expo/run_exp_augmentation.py b/expo/run_exp_augmentation.py index 3f8eff3b3..7fb174ff7 100644 --- a/expo/run_exp_augmentation.py +++ b/expo/run_exp_augmentation.py @@ -1,15 +1,17 @@ -import os -from expo.research_assistant import ResearchAssistant +import argparse import asyncio -from expo.utils import DATA_CONFIG, get_exp_pool_path +import datetime +import json +import os + +import pandas as pd + from expo.dataset import generate_task_requirement +from expo.evaluation.evaluation import evaluate_score from expo.insights.instruction_generator import InstructionGenerator from expo.MCTS import create_initial_state -from expo.evaluation.evaluation import evaluate_score -import json -import argparse -import pandas as pd -import datetime +from expo.research_assistant import ResearchAssistant +from expo.utils import DATA_CONFIG, get_exp_pool_path EXPS_PROMPT = """ When doing the tasks, you can refer to the insights below: @@ -18,13 +20,14 @@ When doing the tasks, you can refer to the insights below: """ data_config = DATA_CONFIG + def evaluate_test(score, state): datetime_text = datetime.datetime.now().strftime("%Y%m%d%H%M") task_name = state["task"] prediction_fpath = os.path.join(state["work_dir"], task_name, "predictions.csv") predictions = pd.read_csv(prediction_fpath)["target"] # copy predictions.csv to the node_dir - + predictions_node_fpath = os.path.join("results", f"{task_name}-{datetime_text}-predictions.csv") predictions.to_csv(predictions_node_fpath, index=False) # load test_target.csv @@ -35,8 +38,6 @@ def evaluate_test(score, state): return score - - async def main(task_name, use_reflection=True, mode="single", num_experiments=2): """ mode: single or set @@ -44,8 +45,10 @@ async def main(task_name, use_reflection=True, mode="single", num_experiments=2) set: sample a set of instructions """ low_is_better = False - state = create_initial_state(task_name, start_task_id=1, data_config=data_config, low_is_better=low_is_better, name="") - + state = create_initial_state( + task_name, start_task_id=1, data_config=data_config, low_is_better=low_is_better, name="" + ) + user_requirement = generate_task_requirement(task_name, data_config) exp_pool_path = get_exp_pool_path(task_name, data_config, pool_name="ds_analysis_pool") exp_pool = InstructionGenerator.load_analysis_pool(exp_pool_path) @@ -58,7 +61,7 @@ async def main(task_name, use_reflection=True, mode="single", num_experiments=2) exps = [exp_set_text] * num_experiments else: raise ValueError(f"Invalid mode: {mode}") - + scores = [] for i in range(num_experiments): di = ResearchAssistant(node_id=str(i), use_reflection=use_reflection) @@ -70,16 +73,18 @@ async def main(task_name, use_reflection=True, mode="single", num_experiments=2) score = evaluate_test(score, state) scores.append(score) - with open(f"results/{task_name}_scores.json", "w") as f: # save scores and corresponding insights - results = {"avg_score": sum([score["test_score"] for score in scores if score])/num_experiments, - "max_score": max([score["test_score"] for score in scores]), - "scores": scores, "insights": exps} + results = { + "avg_score": sum([score["test_score"] for score in scores if score]) / num_experiments, + "max_score": max([score["test_score"] for score in scores]), + "scores": scores, + "insights": exps, + } json.dump(results, f, indent=4) - - + + def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--task", type=str, default="titanic") @@ -90,8 +95,9 @@ def parse_args(): parser.add_argument("--num_experiments", type=int, default=2) return parser.parse_args() - if __name__ == "__main__": args = parse_args() - asyncio.run(main(args.task, use_reflection=args.use_reflection, mode=args.mode, num_experiments=args.num_experiments)) + asyncio.run( + main(args.task, use_reflection=args.use_reflection, mode=args.mode, num_experiments=args.num_experiments) + ) diff --git a/expo/run_experiment.py b/expo/run_experiment.py index 826019321..f8e58ce4f 100644 --- a/expo/run_experiment.py +++ b/expo/run_experiment.py @@ -1,6 +1,12 @@ -from expo.experimenter import MCTSExperimenter, Experimenter, AugExperimenter, CustomExperimenter -import asyncio import argparse +import asyncio + +from expo.experimenter import ( + AugExperimenter, + CustomExperimenter, + Experimenter, + MCTSExperimenter, +) def get_args(): @@ -19,6 +25,7 @@ def get_mcts_args(parser): parser.set_defaults(load_tree=False) parser.add_argument("--rollouts", type=int, default=5) + def get_aug_exp_args(parser): parser.add_argument("--aug_mode", type=str, default="single", choices=["single", "set"]) parser.add_argument("--num_experiments", type=int, default=1) @@ -31,7 +38,7 @@ def get_di_args(parser): parser.add_argument("--reflection", dest="reflection", action="store_true") parser.add_argument("--no_reflection", dest="reflection", action="store_false") parser.set_defaults(reflection=True) - + async def main(args): if args.exp_mode == "mcts": @@ -46,6 +53,7 @@ async def main(args): raise ValueError(f"Invalid exp_mode: {args.exp_mode}") await experimenter.run_experiment() + if __name__ == "__main__": args = get_args() - asyncio.run(main(args)) \ No newline at end of file + asyncio.run(main(args)) diff --git a/expo/run_mcts.py b/expo/run_mcts.py index 20d4171f7..4577417a9 100644 --- a/expo/run_mcts.py +++ b/expo/run_mcts.py @@ -1,10 +1,9 @@ -from expo.MCTS import MCTS, Node, initialize_di_root_node -from expo.utils import load_data_config -from expo.dataset import generate_task_requirement +import argparse +import asyncio from expo.evaluation.visualize_mcts import get_tree_text -import asyncio -import argparse +from expo.MCTS import MCTS +from expo.utils import load_data_config def get_args(): @@ -35,9 +34,17 @@ if __name__ == "__main__": # asyncio.run(root_node.run_node()) mcts = MCTS(root_node=None, max_depth=5) - best_nodes = asyncio.run(mcts.search(args.task, data_config, - low_is_better=args.low_is_better, load_tree=args.load_tree, - reflection=args.reflection, rollouts=args.rollouts, name=args.name)) + best_nodes = asyncio.run( + mcts.search( + args.task, + data_config, + low_is_better=args.low_is_better, + load_tree=args.load_tree, + reflection=args.reflection, + rollouts=args.rollouts, + name=args.name, + ) + ) best_node = best_nodes["global_best"] dev_best_node = best_nodes["dev_best"] text, num_generated_codes = get_tree_text(mcts.root_node) @@ -49,5 +56,3 @@ if __name__ == "__main__": f.write(f"Best node: {best_node}, score: {best_node.raw_reward}\n") f.write(f"Dev best node: {dev_best_node}, score: {dev_best_node.raw_reward}\n") f.write(text) - - diff --git a/expo/utils.py b/expo/utils.py index 20e3fa7f5..d67ceb5a1 100644 --- a/expo/utils.py +++ b/expo/utils.py @@ -1,50 +1,58 @@ -import yaml -from metagpt.roles.role import Role -from metagpt.actions.di.execute_nb_code import ExecuteNbCode -# from nbclient import NotebookClient -from nbformat.notebooknode import NotebookNode -import nbformat -from pathlib import Path -from loguru import logger as _logger -from datetime import datetime -import sys import os import re +import sys +from datetime import datetime +from pathlib import Path + +import nbformat +import yaml +from loguru import logger as _logger + +# from nbclient import NotebookClient +from nbformat.notebooknode import NotebookNode + +from metagpt.roles.role import Role + def load_data_config(file_path="data.yaml"): - with open(file_path, 'r') as stream: + with open(file_path, "r") as stream: data_config = yaml.safe_load(stream) return data_config + DATA_CONFIG = load_data_config() + def get_mcts_logger(): print_level = "INFO" print_level2 = "MCTS" - logfile_level="MCTS" + logfile_level = "MCTS" name: str = None current_date = datetime.now() formatted_date = current_date.strftime("%Y%m%d") log_name = f"{name}_{formatted_date}" if name else formatted_date # name a log with prefix name _logger.remove() - new_level = _logger.level(logfile_level, color="", no=25) + _logger.level(logfile_level, color="", no=25) _logger.add(sys.stderr, level=print_level) _logger.add(sys.stderr, level=print_level2) _logger.add(Path(DATA_CONFIG["work_dir"]) / DATA_CONFIG["role_dir"] / f"{log_name}.txt", level=logfile_level) _logger.propagate = False return _logger + mcts_logger = get_mcts_logger() def get_exp_pool_path(task_name, data_config, pool_name="analysis_pool"): - datasets_dir = data_config['datasets_dir'] - if task_name in data_config['datasets']: - dataset = data_config['datasets'][task_name] - data_path = os.path.join(datasets_dir, dataset['dataset']) + datasets_dir = data_config["datasets_dir"] + if task_name in data_config["datasets"]: + dataset = data_config["datasets"][task_name] + data_path = os.path.join(datasets_dir, dataset["dataset"]) else: - raise ValueError(f"Dataset {task_name} not found in config file. Available datasets: {data_config['datasets'].keys()}") + raise ValueError( + f"Dataset {task_name} not found in config file. Available datasets: {data_config['datasets'].keys()}" + ) exp_pool_path = os.path.join(data_path, f"{pool_name}.json") return exp_pool_path @@ -60,7 +68,6 @@ def change_plan(role, plan): if not finished: tasks[i].plan = plan return finished - def is_cell_to_delete(cell: NotebookNode) -> bool: @@ -82,12 +89,14 @@ def process_cells(nb: NotebookNode) -> NotebookNode: nb["cells"] = new_cells return nb + def save_notebook(role: Role, save_dir: str = "", name: str = ""): save_dir = Path(save_dir) nb = process_cells(role.execute_code.nb) file_path = save_dir / f"{name}.ipynb" nbformat.write(nb, file_path) + async def load_execute_notebook(role): tasks = role.planner.plan.tasks codes = [task.code for task in tasks if task.code] @@ -99,6 +108,7 @@ async def load_execute_notebook(role): print("Finish executing the loaded notebook") return executor + def clean_json_from_rsp(text): pattern = r"```json(.*?)```" matches = re.findall(pattern, text, re.DOTALL) @@ -106,4 +116,4 @@ def clean_json_from_rsp(text): json_str = "\n".join(matches) return json_str else: - return "" \ No newline at end of file + return ""