diff --git a/expo/MCTS.py b/expo/MCTS.py index ab9957a7a..7c03e2e86 100644 --- a/expo/MCTS.py +++ b/expo/MCTS.py @@ -1,23 +1,28 @@ -import random import math import os -import pandas as pd -from expo.research_assistant import ResearchAssistant -from expo.insights.instruction_generator import InstructionGenerator -from expo.dataset import get_split_dataset_path, generate_task_requirement -from expo.evaluation.evaluation import evaluate_score -from expo.utils import mcts_logger, load_execute_notebook, get_exp_pool_path - -from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender -from metagpt.utils.common import write_json_file, read_json_file, format_trackback_info -import numpy as np import pickle +import random + +import pandas as pd + +from expo.dataset import generate_task_requirement, get_split_dataset_path +from expo.evaluation.evaluation import evaluate_score +from expo.insights.instruction_generator import InstructionGenerator +from expo.research_assistant import ResearchAssistant +from expo.utils import get_exp_pool_path, load_execute_notebook, mcts_logger +from metagpt.tools.tool_recommend import ToolRecommender +from metagpt.utils.common import read_json_file + def initialize_di_root_node(task, data_config, low_is_better=False, reflection=True, name=""): start_task_id = 2 - state = create_initial_state(task, start_task_id=start_task_id, data_config=data_config, low_is_better=low_is_better, name=name) - role = ResearchAssistant(node_id="0", start_task_id=start_task_id, use_reflection=reflection, role_dir=state["node_dir"]) - return role, Node(parent=None, state=state, action=None, value=0) + state = create_initial_state( + task, start_task_id=start_task_id, data_config=data_config, low_is_better=low_is_better, name=name + ) + role = ResearchAssistant( + node_id="0", start_task_id=start_task_id, use_reflection=reflection, role_dir=state["node_dir"] + ) + return role, Node(parent=None, state=state, action=None, value=0) def create_initial_state(task, start_task_id, data_config, low_is_better, name): @@ -36,16 +41,16 @@ def create_initial_state(task, start_task_id, data_config, low_is_better, name): return initial_state -class Node(): - state : dict = {} - action : str = None - value : float = 0 - visited : int = 0 - children : list = [] - normalized_reward : dict = {"train_score": 0, "dev_score": 0, "test_score": 0} +class Node: + state: dict = {} + action: str = None + value: float = 0 + visited: int = 0 + children: list = [] + normalized_reward: dict = {"train_score": 0, "dev_score": 0, "test_score": 0} parent = None - def __init__(self, parent=None, state = None, action=None, value = 0, max_depth=4, **kwargs): + def __init__(self, parent=None, state=None, action=None, value=0, max_depth=4, **kwargs): self.state = state self.action = action self.value = value @@ -66,14 +71,14 @@ class Node(): def __hash__(self): return hash(self.id) - + def save_node(self): - os.makedirs(self.state["node_dir"], exist_ok=True) - with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), 'wb') as f: + os.makedirs(self.state["node_dir"], exist_ok=True) + with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), "wb") as f: pickle.dump(self, f) - + def load_node(self): - with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), 'rb') as f: + with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), "rb") as f: return pickle.load(f) def get_depth(self): @@ -94,14 +99,14 @@ class Node(): def is_terminal(self): return int(self.state["start_task_id"]) == self.max_depth + 1 - + def is_fully_expanded(self): return len(self.children) > 0 - + def add_child(self, child_node): self.children.append(child_node) - def update(self, reward:dict, child_node=None): + def update(self, reward: dict, child_node=None): if child_node is not None: child_role = child_node.load_role() role = self.load_role() @@ -117,46 +122,48 @@ class Node(): fname = f"Node-{self.id}.json" role_path = os.path.join(self.state["node_dir"], fname) return role_path - + def load_role(self): role_dict = read_json_file(self.get_role_path()) - if role_dict.get('tool_recommender') is None: - role_dict['tool_recommender'] = ToolRecommender() - elif isinstance(role_dict.get('tool_recommender', {}).get('tools'), dict): - role_dict['tool_recommender']['tools'] = list(role_dict['tool_recommender']['tools'].keys()) + if role_dict.get("tool_recommender") is None: + role_dict["tool_recommender"] = ToolRecommender() + elif isinstance(role_dict.get("tool_recommender", {}).get("tools"), dict): + role_dict["tool_recommender"]["tools"] = list(role_dict["tool_recommender"]["tools"].keys()) role = ResearchAssistant(**role_dict) - if self.parent is not None: # TODO: Check this + if self.parent is not None: # TODO: Check this parent_role = self.parent.load_role() role.update_til_start_task(parent_role, backward=False) role.remap_tasks() return role - + def save_new_role(self, role: ResearchAssistant): role.node_id = self.id - role.start_task_id = self.state['start_task_id'] + role.start_task_id = self.state["start_task_id"] role.state_saved = False - role.change_next_instruction(self.action) + role.change_next_instruction(self.action) mcts_logger.log("MCTS", f"Saving new role: {role.node_id}") role.save_state(static_save=True) - + async def expand(self, max_children): if self.is_fully_expanded(): return insight_geneartor = InstructionGenerator() role = self.load_role() original_instruction = role.get_next_instruction() - insights = await insight_geneartor.generate_new_instructions(task_id=role.start_task_id + 1, - original_instruction=original_instruction, - max_num=max_children, - file_path=self.state["exp_pool_path"]) + insights = await insight_geneartor.generate_new_instructions( + task_id=role.start_task_id + 1, + original_instruction=original_instruction, + max_num=max_children, + file_path=self.state["exp_pool_path"], + ) new_state = self.state.copy() - new_state['start_task_id'] += 1 + new_state["start_task_id"] += 1 for insight in insights: new_role = role.model_copy() node = Node(parent=self, state=new_state, action=insight, value=0) node.save_new_role(new_role) self.add_child(node) - + # def evaluate_test(self): # prediction_fpath = os.path.join(self.state["work_dir"], self.state["task"], "predictions.csv") # predictions = pd.read_csv(prediction_fpath)["target"] @@ -168,7 +175,7 @@ class Node(): # gt = pd.read_csv(os.path.join(split_datasets_dir["test_target"]))["target"] # metric = self.state["dataset_config"]["metric"] # return evaluate_score(predictions, gt, metric) - + def evaluate_prediction(self, split): pred_path = os.path.join(self.state["work_dir"], self.state["task"], f"{split}_predictions.csv") pred_node_path = os.path.join(self.state["node_dir"], f"Node-{self.id}-{split}_predictions.csv") @@ -180,21 +187,17 @@ class Node(): # remove original predictions.csv os.remove(pred_path) return evaluate_score(preds, gt, metric) - + def evaluate_simulation(self, score_dict): - scores = { - "dev_score": self.evaluate_prediction("dev"), - "test_score": self.evaluate_prediction("test") - } + scores = {"dev_score": self.evaluate_prediction("dev"), "test_score": self.evaluate_prediction("test")} score_dict.update(scores) return score_dict - - + async def run_node(self, role=None): if self.is_terminal() and role is not None: if role.state_saved: return self.raw_reward - + max_retries = 3 num_runs = 1 run_finished = False @@ -202,10 +205,10 @@ class Node(): try: if not role: role = self.load_role() - await load_execute_notebook(role) # execute previous notebook's code - await role.run(with_message='continue') + await load_execute_notebook(role) # execute previous notebook's code + await role.run(with_message="continue") else: - await role.run(with_message=self.state['requirement']) + await role.run(with_message=self.state["requirement"]) run_finished = True except Exception as e: mcts_logger.log("MCTS", f"Error in running the role: {e}") @@ -222,18 +225,19 @@ class Node(): if score == -1: return 0 return 1 / (1 + score) + score_dict = {k: normalize_score(v) for k, v in score_dict.items()} self.normalized_reward = score_dict return score_dict - -class MCTS(): - #data_path - root_node : Node = None - children : dict = {} - max_depth : int = 5 - c_explore : float = 1.4 - c_unvisited : float = 0.8 + +class MCTS: + # data_path + root_node: Node = None + children: dict = {} + max_depth: int = 5 + c_explore: float = 1.4 + c_unvisited: float = 0.8 def __init__(self, root_node, max_depth): self.root_node = root_node @@ -243,34 +247,34 @@ class MCTS(): node = self.best_child() mcts_logger.log("MCTS", f"Selected node id: {node.id}") return node - + def best_child(self): def uct(node: Node): n_visits = node.visited if node.visited else self.c_unvisited - avg_value = node.avg_value() if node.visited else node.value/self.c_unvisited + avg_value = node.avg_value() if node.visited else node.value / self.c_unvisited return avg_value + self.c_explore * math.sqrt(math.log(node.parent.visited) / n_visits) + if len(self.children) == 0: return self.root_node all_children = [child for children in self.children.values() for child in children] return max(all_children, key=uct) - async def expand(self, node : Node, max_children=5): + async def expand(self, node: Node, max_children=5): await node.expand(max_children) if node not in self.children or not self.children[node]: self.children[node] = node.children return node.children - - async def simulate(self, node : Node, role=None): + + async def simulate(self, node: Node, role=None): "Returns the reward for a random simulation (to completion) of `node`" mcts_logger.log("MCTS", f"Start simulating node {node.id}:") while node.children: node = random.choice(node.children) reward = await node.run_node(role) - mcts_logger.log("MCTS", f"Simulated node's reward: {reward}") + mcts_logger.log("MCTS", f"Simulated node's reward: {reward}") return reward - - def backpropagate(self, node : Node, reward): + def backpropagate(self, node: Node, reward): child_node = node node.update(reward) node = node.parent @@ -278,10 +282,11 @@ class MCTS(): node.update(reward, child_node) node, child_node = node.parent, node - def best_path(self, root : Node): + def best_path(self, root: Node): best_child = root best_score = 0 - def bfs(node : Node, best_score, best_child : Node, split): + + def bfs(node: Node, best_score, best_child: Node, split): assert split in ["test_score", "dev_score"] if node not in self.children: return best_score, best_child @@ -293,19 +298,19 @@ class MCTS(): best_child = child best_score, best_child = bfs(child, best_score, best_child, split) return best_score, best_child + _, best_child = bfs(root, best_score, best_child, "test_score") _, dev_best_child = bfs(root, best_score, best_child, "dev_score") - return {"dev_best": dev_best_child, - "global_best": best_child} - + return {"dev_best": dev_best_child, "global_best": best_child} + def get_num_simulations(self): return self.root_node.visited - async def search(self, task, data_config, name, - rollouts, load_tree=False, low_is_better=False, reflection=False): - - role, root = initialize_di_root_node(task, data_config, low_is_better=low_is_better, reflection=reflection, name=name) + async def search(self, task, data_config, name, rollouts, load_tree=False, low_is_better=False, reflection=False): + role, root = initialize_di_root_node( + task, data_config, low_is_better=low_is_better, reflection=reflection, name=name + ) self.root_node = root tree_loaded = False if load_tree: @@ -314,14 +319,14 @@ class MCTS(): mcts_logger.log("MCTS", f"Tree loaded: {tree_loaded}") if not tree_loaded: - rollouts -= 2 # 2 rollouts for the initial tree + rollouts -= 2 # 2 rollouts for the initial tree if rollouts < 0: raise ValueError("Rollouts must be greater than 2 if there is no tree to load") self.children[root] = [] reward = await self.simulate(root, role) self.backpropagate(root, reward) children = await self.expand(root) - #目前是随机选择1个,后续可以改成多个 + # 目前是随机选择1个,后续可以改成多个 first_leaf = random.choice(children) reward = await self.simulate(first_leaf) self.backpropagate(first_leaf, reward) @@ -339,14 +344,13 @@ class MCTS(): mcts_logger.log("MCTS", f"Terminal node's reward: {reward}") self.backpropagate(node, reward) else: - if node.visited > 0: + if node.visited > 0: children = await self.expand(node) node = random.choice(children) reward = await self.simulate(node) self.backpropagate(node, reward) return self.best_path(root) - def load_tree(self): def load_children_node(node): mcts_logger.log("MCTS", f"Load node {node.id}'s child: {node.children}") @@ -356,14 +360,15 @@ class MCTS(): child.load_node() self.children[child] = child.children load_children_node(child) + # Load all pkl files in the node_dir all_pkl_files = os.listdir(self.root_node.state["node_dir"]) all_pkl_files = [f for f in all_pkl_files if f.endswith(".pkl")] if os.path.exists(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl")): - with open(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl"), 'rb') as f: + with open(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl"), "rb") as f: self.root_node = pickle.load(f) self.children[self.root_node] = self.root_node.children load_children_node(self.root_node) if self.children: return True - return False \ No newline at end of file + return False diff --git a/expo/dataset.py b/expo/dataset.py index a43f1292a..fee1199a9 100644 --- a/expo/dataset.py +++ b/expo/dataset.py @@ -1,12 +1,14 @@ -import openml -from pathlib import Path -from sklearn.model_selection import train_test_split -import os -import json -import yaml -import pandas as pd -from expo.insights.solution_designer import SolutionDesigner import asyncio +import json +import os +from pathlib import Path + +import openml +import pandas as pd +import yaml +from sklearn.model_selection import train_test_split + +from expo.insights.solution_designer import SolutionDesigner BASE_USER_REQUIREMENT = """\ This is a {datasetname} dataset. Your goal is to predict the target column `{target_col}`. @@ -59,14 +61,12 @@ OPENML_DATASET_IDS = [ 41980, 42225, 531, - # cls 41143, 31, 42733, 41162, 1067, - # multi cls 40498, 40982, @@ -79,14 +79,15 @@ CUSTOM_DATASETS = [ ("04_titanic", "Survived"), ("05_house-prices-advanced-regression-techniques", "SalePrice"), ("06_santander-customer-transaction-prediction", "target"), - ("07_icr-identify-age-related-conditions", "Class") + ("07_icr-identify-age-related-conditions", "Class"), ] + def get_split_dataset_path(dataset_name, config): - datasets_dir = config['datasets_dir'] - if dataset_name in config['datasets']: - dataset = config['datasets'][dataset_name] - data_path = os.path.join(datasets_dir, dataset['dataset']) + datasets_dir = config["datasets_dir"] + if dataset_name in config["datasets"]: + dataset = config["datasets"][dataset_name] + data_path = os.path.join(datasets_dir, dataset["dataset"]) split_datasets = { "train": os.path.join(data_path, "split_train.csv"), "dev": os.path.join(data_path, "split_dev.csv"), @@ -98,32 +99,39 @@ def get_split_dataset_path(dataset_name, config): } return split_datasets else: - raise ValueError(f"Dataset {dataset_name} not found in config file. Available datasets: {config['datasets'].keys()}") + raise ValueError( + f"Dataset {dataset_name} not found in config file. Available datasets: {config['datasets'].keys()}" + ) + def get_user_requirement(task_name, config): - datasets_dir = config['datasets_dir'] - if task_name in config['datasets']: - dataset = config['datasets'][task_name] - data_path = os.path.join(datasets_dir, dataset['dataset']) - user_requirement = dataset['user_requirement'] + datasets_dir = config["datasets_dir"] + if task_name in config["datasets"]: + dataset = config["datasets"][task_name] + data_path = os.path.join(datasets_dir, dataset["dataset"]) + user_requirement = dataset["user_requirement"] return data_path, user_requirement else: - raise ValueError(f"Dataset {task_name} not found in config file. Available datasets: {config['datasets'].keys()}") + raise ValueError( + f"Dataset {task_name} not found in config file. Available datasets: {config['datasets'].keys()}" + ) def save_datasets_dict_to_yaml(datasets_dict): with open("datasets.yaml", "w") as file: yaml.dump(datasets_dict, file) + def create_dataset_dict(dataset): dataset_dict = { "dataset": dataset.name, "user_requirement": dataset.create_base_requirement(), "metric": dataset.get_metric(), - "target_col": dataset.target_col + "target_col": dataset.target_col, } return dataset_dict + def generate_task_requirement(task_name, data_config): user_requirement = get_user_requirement(task_name, data_config) split_dataset_path = get_split_dataset_path(task_name, data_config) @@ -132,19 +140,23 @@ def generate_task_requirement(task_name, data_config): test_path = split_dataset_path["test_wo_target"] work_dir = data_config["work_dir"] output_dir = f"{work_dir}/{task_name}" - user_requirement = TASK_PROMPT.format(user_requirement=user_requirement, - train_path=train_path, dev_path=dev_path, test_path=test_path, - output_dir=output_dir) + user_requirement = TASK_PROMPT.format( + user_requirement=user_requirement, + train_path=train_path, + dev_path=dev_path, + test_path=test_path, + output_dir=output_dir, + ) print(user_requirement) return user_requirement class ExpDataset: - description : str = None - metadata : dict = None - dataset_dir : str = None - target_col : str = None - name : str = None + description: str = None + metadata: dict = None + dataset_dir: str = None + target_col: str = None + name: str = None def __init__(self, name, dataset_dir, **kwargs): self.name = name @@ -154,18 +166,23 @@ class ExpDataset: self.save_dataset(target_col=self.target_col) def check_dataset_exists(self): - fnames = ["split_train.csv", "split_dev.csv", "split_test.csv", - "split_dev_wo_target.csv", "split_dev_target.csv", - "split_test_wo_target.csv", "split_test_target.csv"] + fnames = [ + "split_train.csv", + "split_dev.csv", + "split_test.csv", + "split_dev_wo_target.csv", + "split_dev_target.csv", + "split_test_wo_target.csv", + "split_test_target.csv", + ] for fname in fnames: if not os.path.exists(Path(self.dataset_dir, self.name, fname)): return False return True - + def check_datasetinfo_exists(self): return os.path.exists(Path(self.dataset_dir, self.name, "dataset_info.json")) - def get_raw_dataset(self): raw_dir = Path(self.dataset_dir, self.name, "raw") if not os.path.exists(Path(raw_dir, "train.csv")): @@ -173,17 +190,17 @@ class ExpDataset: else: df = pd.read_csv(Path(raw_dir, "train.csv")) return df - + def get_dataset_info(self): raw_df = pd.read_csv(Path(self.dataset_dir, self.name, "raw", "train.csv")) metadata = { - 'NumberOfClasses': raw_df[self.target_col].nunique(), - 'NumberOfFeatures': raw_df.shape[1], - 'NumberOfInstances': raw_df.shape[0], - 'NumberOfInstancesWithMissingValues': int(raw_df.isnull().any(axis=1).sum()), - 'NumberOfMissingValues': int(raw_df.isnull().sum().sum()), - 'NumberOfNumericFeatures': raw_df.select_dtypes(include=['number']).shape[1], - 'NumberOfSymbolicFeatures': raw_df.select_dtypes(include=['object']).shape[1], + "NumberOfClasses": raw_df[self.target_col].nunique(), + "NumberOfFeatures": raw_df.shape[1], + "NumberOfInstances": raw_df.shape[0], + "NumberOfInstancesWithMissingValues": int(raw_df.isnull().any(axis=1).sum()), + "NumberOfMissingValues": int(raw_df.isnull().sum().sum()), + "NumberOfNumericFeatures": raw_df.select_dtypes(include=["number"]).shape[1], + "NumberOfSymbolicFeatures": raw_df.select_dtypes(include=["object"]).shape[1], } df_head_text = raw_df.head().to_string(index=False) @@ -193,10 +210,10 @@ class ExpDataset: "description": "", "target_col": self.target_col, "metadata": metadata, - "df_head": df_head_text + "df_head": df_head_text, } return dataset_info - + def get_metric(self): dataset_info = self.get_dataset_info() num_classes = dataset_info["metadata"]["NumberOfClasses"] @@ -216,7 +233,6 @@ class ExpDataset: return req def save_dataset(self, target_col): - df = self.get_raw_dataset() if not self.check_dataset_exists() or self.force_update: print(f"Saving Dataset {self.name} in {self.dataset_dir}") @@ -249,25 +265,22 @@ class ExpDataset: def split_and_save(self, df, target_col): if not target_col: raise ValueError("Target column not provided") - train, test = train_test_split(df, test_size=1-TRAIN_TEST_SPLIT, random_state=SEED) - train, dev = train_test_split(train, test_size=1-TRAIN_DEV_SPLIT, random_state=SEED) + train, test = train_test_split(df, test_size=1 - TRAIN_TEST_SPLIT, random_state=SEED) + train, dev = train_test_split(train, test_size=1 - TRAIN_DEV_SPLIT, random_state=SEED) self.save_split_datasets(train, "train") self.save_split_datasets(dev, "dev", target_col) self.save_split_datasets(test, "test", target_col) - - + class OpenMLExpDataset(ExpDataset): def __init__(self, name, dataset_dir, dataset_id, **kwargs): self.dataset_id = dataset_id - self.dataset = openml.datasets.get_dataset(self.dataset_id, - download_data=False, - download_qualities=False, - download_features_meta_data=True) + self.dataset = openml.datasets.get_dataset( + self.dataset_id, download_data=False, download_qualities=False, download_features_meta_data=True + ) self.name = self.dataset.name self.target_col = self.dataset.default_target_attribute super().__init__(self.name, dataset_dir, target_col=self.target_col, **kwargs) - def get_raw_dataset(self): dataset = self.dataset @@ -276,7 +289,7 @@ class OpenMLExpDataset(ExpDataset): os.makedirs(raw_dir, exist_ok=True) dataset_df.to_csv(Path(raw_dir, "train.csv"), index=False) return dataset_df - + def get_dataset_info(self): dataset_info = super().get_dataset_info() dataset = self.dataset @@ -290,12 +303,14 @@ class OpenMLExpDataset(ExpDataset): # def __init__(self, name, dataset_dir, dataset_name, **kwargs): # super().__init__(name, dataset_dir, **kwargs) + async def process_dataset(dataset, solution_designer, save_analysis_pool, datasets_dict): if save_analysis_pool: asyncio.run(solution_designer.generate_solutions(dataset.get_dataset_info(), dataset.name)) dataset_dict = create_dataset_dict(dataset) datasets_dict["datasets"][dataset.name] = dataset_dict + if __name__ == "__main__": datasets_dir = "D:/work/automl/datasets" force_update = False diff --git a/expo/evaluation/evaluation.py b/expo/evaluation/evaluation.py index 886bc036d..16b3acb71 100644 --- a/expo/evaluation/evaluation.py +++ b/expo/evaluation/evaluation.py @@ -1,5 +1,6 @@ -from sklearn.metrics import f1_score, accuracy_score, roc_auc_score, mean_squared_error import numpy as np +from sklearn.metrics import accuracy_score, f1_score, mean_squared_error, roc_auc_score + def evaluate_score(pred, gt, metric): if metric == "accuracy": @@ -20,4 +21,4 @@ def evaluate_score(pred, gt, metric): elif metric == "log rmse": return mean_squared_error(np.log1p(gt), np.log1p(pred), squared=False) else: - raise ValueError(f"Metric {metric} not supported") \ No newline at end of file + raise ValueError(f"Metric {metric} not supported") diff --git a/expo/evaluation/visualize_mcts.py b/expo/evaluation/visualize_mcts.py index 4199def0e..d310036c0 100644 --- a/expo/evaluation/visualize_mcts.py +++ b/expo/evaluation/visualize_mcts.py @@ -1,7 +1,7 @@ - -from expo.MCTS import Node, MCTS import textwrap +from expo.MCTS import Node + NODE_TEMPLATE = """\ [Node {id}] Plans: @@ -11,21 +11,23 @@ Score: {score}, Visits: {num_visits} """ + def get_role_plans(role): plans = role.planner.plan.tasks instruct_plans = [f"{i+1}. {task.instruction}" for i, task in enumerate(plans)] return instruct_plans -def get_tree_text(node : Node): +def get_tree_text(node: Node): role_dict = {} code_set = set() + def load_role(node): if node.id not in role_dict: role_dict[node.id] = node.load_role() return role_dict[node.id] - - def visualize_node(node : Node, previous_plans=None): + + def visualize_node(node: Node, previous_plans=None): role = load_role(node) node_id = node.id plans = role.planner.plan.tasks @@ -36,7 +38,9 @@ def get_tree_text(node : Node): simulated = role.state_saved score = f"avg score: {node.avg_value()}, simulated score: {node.raw_reward}" num_visits = node.visited - return NODE_TEMPLATE.format(id=node_id, plans=instruct_plans_text, simulated=simulated, score=score, num_visits=num_visits) + return NODE_TEMPLATE.format( + id=node_id, plans=instruct_plans_text, simulated=simulated, score=score, num_visits=num_visits + ) def visualize_tree(node, depth=0, previous_plans=None): text = "" @@ -46,11 +50,10 @@ def get_tree_text(node : Node): code_set.update({task.instruction for task in role.planner.plan.tasks}) previous_plans = get_role_plans(role) for child in node.children: - text += textwrap.indent(visualize_tree(child, depth+1, previous_plans), "\t") + text += textwrap.indent(visualize_tree(child, depth + 1, previous_plans), "\t") return text + num_simulations = node.visited text = f"Number of simulations: {num_simulations}\n" text += visualize_tree(node) return text, len(code_set) - - diff --git a/expo/experimenter/__init__.py b/expo/experimenter/__init__.py index 2eab295f7..e69de29bb 100644 --- a/expo/experimenter/__init__.py +++ b/expo/experimenter/__init__.py @@ -1,4 +0,0 @@ -from .experimenter import Experimenter -from .mcts import MCTSExperimenter -from .aug import AugExperimenter -from .custom import CustomExperimenter \ No newline at end of file diff --git a/expo/experimenter/aug.py b/expo/experimenter/aug.py index 86c98fd42..9b14123d3 100644 --- a/expo/experimenter/aug.py +++ b/expo/experimenter/aug.py @@ -1,9 +1,8 @@ from experimenter import Experimenter -from expo.MCTS import create_initial_state -from expo.dataset import generate_task_requirement -from expo.utils import mcts_logger, load_execute_notebook, get_exp_pool_path + from expo.insights.instruction_generator import InstructionGenerator from expo.research_assistant import ResearchAssistant +from expo.utils import get_exp_pool_path EXPS_PROMPT = """ When doing the tasks, you can refer to the insights below: @@ -12,10 +11,8 @@ When doing the tasks, you can refer to the insights below: """ - - class AugExperimenter(Experimenter): - result_path : str = "results/aug" + result_path: str = "results/aug" async def run_experiment(self): # state = create_initial_state(self.args.task, start_task_id=1, data_config=self.data_config, low_is_better=self.args.low_is_better, name="") @@ -31,7 +28,7 @@ class AugExperimenter(Experimenter): exps = [exp_set_text] * self.args.num_experiments else: raise ValueError(f"Invalid mode: {self.args.aug_mode}") - + results = [] for i in range(self.args.num_experiments): di = ResearchAssistant(node_id=str(i), use_reflection=self.args.reflection) @@ -39,20 +36,19 @@ class AugExperimenter(Experimenter): requirement = user_requirement + EXPS_PROMPT.format(experience=exps[i]) print(requirement) score_dict = await self.run_di(di, requirement) - results.append({ - "idx": i, - "score_dict": score_dict, - "aug_mode": self.args.aug_mode, - "insights" : exps[i], - "user_requirement": requirement, - "args": vars(self.args) - }) + results.append( + { + "idx": i, + "score_dict": score_dict, + "aug_mode": self.args.aug_mode, + "insights": exps[i], + "user_requirement": requirement, + "args": vars(self.args), + } + ) scores = [result["score_dict"]["test_score"] for result in results] avg_score = sum(scores) / len(scores) best_score = max(scores) if not self.args.low_is_better else min(scores) best_score_idx = scores.index(best_score) results.insert(0, {"avg_score": avg_score, "best_score": best_score, "best_score_idx": best_score_idx}) self.save_result(results) - - - \ No newline at end of file diff --git a/expo/experimenter/autogluon.py b/expo/experimenter/autogluon.py index d59fb3e83..4f5d151ef 100644 --- a/expo/experimenter/autogluon.py +++ b/expo/experimenter/autogluon.py @@ -1,13 +1,15 @@ -from expo.experimenter.custom import CustomExperimenter from autogluon.tabular import TabularDataset, TabularPredictor -class AGRunner(): +from expo.experimenter.custom import CustomExperimenter + + +class AGRunner: preset = "best_quality" time_limit = 500 def __init__(self, datasets): self.datasets = datasets - + def run(self): train_path = self.datasets["train"] test_wo_target_path = self.datasets["test_wo_target"] @@ -16,17 +18,16 @@ class AGRunner(): train_data = TabularDataset(train_path) test_data = TabularDataset(test_wo_target_path) dev_data = TabularDataset(dev_wo_target_path) - + predictor = TabularPredictor(label=target_col).fit(train_data, presets=self.preset, time_limit=self.time_limit) test_preds = predictor.predict(test_data) dev_preds = predictor.predict(dev_data) return {"test_preds": test_preds, "dev_preds": dev_preds} + class GluonExperimenter(CustomExperimenter): - result_path : str = "results/autogluon" + result_path: str = "results/autogluon" def __init__(self, args, **kwargs): super().__init__(args, **kwargs) self.framework = AGRunner(self.datasets) - - diff --git a/expo/experimenter/custom.py b/expo/experimenter/custom.py index c3cb97b9c..ba009bdb0 100644 --- a/expo/experimenter/custom.py +++ b/expo/experimenter/custom.py @@ -1,21 +1,26 @@ -from expo.experimenter import Experimenter -from expo.MCTS import create_initial_state -from expo.evaluation.evaluation import evaluate_score -import pandas as pd import os +import pandas as pd + +from expo.evaluation.evaluation import evaluate_score +from expo.experimenter import Experimenter +from expo.MCTS import create_initial_state + + class CustomExperimenter(Experimenter): - result_path : str = "results/custom" - + result_path: str = "results/custom" + def __init__(self, args, **kwargs): super().__init__(args, **kwargs) - self.framework = kwargs["framework"] # todo + self.framework = kwargs["framework"] # todo self.task = kwargs.get("task", self.args.task) self.low_is_better = kwargs.get("low_is_better", self.args.low_is_better) self.name = kwargs.get("name", "") self.result_path = f"results/custom_{self.name}" - self.state = create_initial_state(self.task, start_task_id=1, data_config=self.data_config, low_is_better=self.low_is_better, name=self.name) - + self.state = create_initial_state( + self.task, start_task_id=1, data_config=self.data_config, low_is_better=self.low_is_better, name=self.name + ) + def run_experiment(self): user_requirement = self.state["requirement"] preds = self.framework.run(user_requirement) @@ -23,13 +28,9 @@ class CustomExperimenter(Experimenter): dev_preds = preds["dev_preds"] score_dict = { "dev_score": self.evaluate_predictions(dev_preds, "dev"), - "test_score": self.evaluate_predictions(test_preds, "test") - } - results = { - "score_dict": score_dict, - "user_requirement": user_requirement, - "args": vars(self.args) + "test_score": self.evaluate_predictions(test_preds, "test"), } + results = {"score_dict": score_dict, "user_requirement": user_requirement, "args": vars(self.args)} self.save_result(results) def evaluate_pred_files(self, dev_pred_path, test_pred_path): @@ -37,7 +38,7 @@ class CustomExperimenter(Experimenter): test_preds = pd.read_csv(test_pred_path)["target"] score_dict = { "dev_score": self.evaluate_score(dev_preds, "dev"), - "test_score": self.evaluate_score(test_preds, "test") + "test_score": self.evaluate_score(test_preds, "test"), } return score_dict @@ -46,8 +47,7 @@ class CustomExperimenter(Experimenter): gt_path = os.path.join(self.state["datasets_dir"][f"{split}_target"]) gt = pd.read_csv(gt_path)["target"] score = evaluate_score(preds, gt, metric) - return score - + return score def load_datasets(self): train_path = self.state["datasets_dir"]["train"] @@ -57,4 +57,3 @@ class CustomExperimenter(Experimenter): dev = pd.read_csv(dev_path) test = pd.read_csv(test_path) return train, dev, test - diff --git a/expo/experimenter/experimenter.py b/expo/experimenter/experimenter.py index 709eefdfc..83dde80b9 100644 --- a/expo/experimenter/experimenter.py +++ b/expo/experimenter/experimenter.py @@ -1,23 +1,29 @@ -from expo.utils import DATA_CONFIG -import os -import pandas as pd -from expo.evaluation.evaluation import evaluate_score import datetime import json +import os + +import pandas as pd + +from expo.evaluation.evaluation import evaluate_score from expo.MCTS import create_initial_state from expo.research_assistant import ResearchAssistant +from expo.utils import DATA_CONFIG class Experimenter: - result_path : str = "results/base" + result_path: str = "results/base" data_config = DATA_CONFIG - def __init__(self, args, **kwargs): self.args = args self.start_time = datetime.datetime.now().strftime("%Y%m%d%H%M") - self.state = create_initial_state(self.args.task, start_task_id=1, data_config=self.data_config, low_is_better=self.args.low_is_better, name="") - + self.state = create_initial_state( + self.args.task, + start_task_id=1, + data_config=self.data_config, + low_is_better=self.args.low_is_better, + name="", + ) async def run_di(self, di, user_requirement): max_retries = 3 @@ -33,14 +39,8 @@ class Experimenter: print(f"Error: {e}") num_runs += 1 if not run_finished: - score_dict = { - "train_score": -1, - "dev_score": -1, - "test_score": -1, - "score": -1 - } + score_dict = {"train_score": -1, "dev_score": -1, "test_score": -1, "score": -1} return score_dict - async def run_experiment(self): state = self.state @@ -50,28 +50,28 @@ class Experimenter: for i in range(self.args.num_experiments): di = ResearchAssistant(node_id="0", use_reflection=self.args.reflection) score_dict = await self.run_di(di, user_requirement) - results.append({ - "idx": i, - "score_dict": score_dict, - "user_requirement": user_requirement, - "args": vars(self.args) - }) - self.save_result(results) # save intermediate results + results.append( + {"idx": i, "score_dict": score_dict, "user_requirement": user_requirement, "args": vars(self.args)} + ) + self.save_result(results) # save intermediate results dev_scores = [result["score_dict"]["dev_score"] for result in results] best_dev_score = max(dev_scores) if not self.args.low_is_better else min(dev_scores) best_score_idx = dev_scores.index(best_dev_score) - + test_scores = [result["score_dict"]["test_score"] for result in results] avg_score = sum(test_scores) / len(test_scores) global_best_score = max(test_scores) if not self.args.low_is_better else min(test_scores) - results.insert(0, { - "best_dev_score": best_dev_score, - "best_score_idx": best_score_idx, - "best_test_score": test_scores[best_score_idx], - "avg_test_score": avg_score, - "best_score": global_best_score - }) + results.insert( + 0, + { + "best_dev_score": best_dev_score, + "best_score_idx": best_score_idx, + "best_test_score": test_scores[best_score_idx], + "avg_test_score": avg_score, + "best_score": global_best_score, + }, + ) self.save_result(results) def evaluate_prediction(self, split, state): @@ -85,7 +85,7 @@ class Experimenter: metric = state["dataset_config"]["metric"] os.remove(pred_path) return evaluate_score(preds, gt, metric) - + def evaluate(self, score_dict, state): scores = { "dev_score": self.evaluate_prediction("dev", state), @@ -94,13 +94,12 @@ class Experimenter: score_dict.update(scores) return score_dict - def save_result(self, result): end_time = datetime.datetime.now().strftime("%Y%m%d%H%M") time_info = { "start_time": self.start_time, "end_time": end_time, - "duration (minutes)": float(end_time) - float(self.start_time) + "duration (minutes)": float(end_time) - float(self.start_time), } result = result.copy() result.insert(0, time_info) diff --git a/expo/experimenter/mcts.py b/expo/experimenter/mcts.py index e41f94d58..921b81412 100644 --- a/expo/experimenter/mcts.py +++ b/expo/experimenter/mcts.py @@ -1,22 +1,25 @@ -from expo.experimenter import Experimenter -from expo.dataset import generate_task_requirement -from expo.MCTS import MCTS from expo.evaluation.visualize_mcts import get_tree_text +from expo.experimenter import Experimenter +from expo.MCTS import MCTS class MCTSExperimenter(Experimenter): - result_path : str = "results/mcts" + result_path: str = "results/mcts" + async def run_experiment(self): mcts = MCTS(root_node=None, max_depth=5) - best_nodes = await mcts.search(self.args.task, self.data_config, - low_is_better=self.args.low_is_better, - load_tree=self.args.load_tree, - reflection=self.args.reflection, - rollouts=self.args.rollouts, - name=self.args.name) + best_nodes = await mcts.search( + self.args.task, + self.data_config, + low_is_better=self.args.low_is_better, + load_tree=self.args.load_tree, + reflection=self.args.reflection, + rollouts=self.args.rollouts, + name=self.args.name, + ) best_node = best_nodes["global_best"] dev_best_node = best_nodes["dev_best"] - + text, num_generated_codes = get_tree_text(mcts.root_node) text += f"Generated {num_generated_codes} unique codes.\n" text += f"Best node: {best_node}, score: {best_node.raw_reward}\n" @@ -24,22 +27,21 @@ class MCTSExperimenter(Experimenter): print(text) self.save_tree(text) - results = [{ - "best_node": best_node.id, - "best_node_score": best_node.raw_reward, - "dev_best_node": dev_best_node.id, - "dev_best_node_score": dev_best_node.raw_reward, - "num_generated_codes": num_generated_codes, - "user_requirement": best_node.state["requirement"], - "tree_text": text, - "args": vars(self.args) - }] + results = [ + { + "best_node": best_node.id, + "best_node_score": best_node.raw_reward, + "dev_best_node": dev_best_node.id, + "dev_best_node_score": dev_best_node.raw_reward, + "num_generated_codes": num_generated_codes, + "user_requirement": best_node.state["requirement"], + "tree_text": text, + "args": vars(self.args), + } + ] self.save_result(results) - - def save_tree(self, tree_text): fpath = f"{self.result_path}/{self.args.task}_tree_{self.args.name}.txt" with open(fpath, "w") as f: f.write(tree_text) - diff --git a/expo/insights/instruction_generator.py b/expo/insights/instruction_generator.py index 4f4155ff8..065565c89 100644 --- a/expo/insights/instruction_generator.py +++ b/expo/insights/instruction_generator.py @@ -1,3 +1,10 @@ +import json +import random + +from expo.utils import clean_json_from_rsp, load_data_config, mcts_logger +from metagpt.llm import LLM +from metagpt.schema import Message + REFLECTION_SYSTEM_MSG = "As a Kaggle grandmaster participating in a competition, you need to analyze your experience and propose evolutionary points that are more likely to improve the performance of baseline code." CHANGE_INSTRUCTION = """ @@ -18,12 +25,6 @@ Rewrite the original instruction according to the insights ``` """ -import re -import random -import json -from metagpt.llm import LLM -from metagpt.schema import Message -from expo.utils import load_data_config, mcts_logger, clean_json_from_rsp DATA_CONFIG = load_data_config() @@ -31,7 +32,7 @@ class InstructionGenerator: data_config = DATA_CONFIG @staticmethod - def load_json_data(json_dir): + def load_json_data(json_dir): with open(json_dir, "r") as file: json_data = json.load(file) return json_data @@ -39,7 +40,7 @@ class InstructionGenerator: @staticmethod def _random_sample(analysis, num_samples): return random.sample(analysis, num_samples) - + @staticmethod def sample_instruction_set(data): data_dict = {} @@ -52,12 +53,12 @@ class InstructionGenerator: for task_id in sorted(data_dict.keys()): instruction_set.append(random.choice(data_dict[task_id])) return instruction_set - + @staticmethod def format_output(rsp): rsp_list = [] - new_data = [] - rsp_list.append(rsp) + new_data = [] + rsp_list.append(rsp) for item in rsp_list: item_dict = json.loads(item) data = { @@ -83,21 +84,19 @@ class InstructionGenerator: new_instructions = [] if len(data) == 0: mcts_logger.log("MCTS", f"No insights available for task {task_id}") - return [original_instruction] # Return the original instruction if no insights are available + return [original_instruction] # Return the original instruction if no insights are available for item in data[:max_num]: insights = item["Analysis"] new_instruction = await InstructionGenerator.generate_new_instruction(original_instruction, insights) new_instructions.append(new_instruction) return new_instructions - + @staticmethod async def generate_new_instruction(original_instruction, insights): prompt = CHANGE_INSTRUCTION.format(instruction=original_instruction, insights=insights) llm = LLM() - context = llm.format_msg([Message(content=prompt, role="user")]) - llm_response = await llm.aask( - context, system_msgs=[REFLECTION_SYSTEM_MSG] - ) + context = llm.format_msg([Message(content=prompt, role="user")]) + llm_response = await llm.aask(context, system_msgs=[REFLECTION_SYSTEM_MSG]) rsp = clean_json_from_rsp(llm_response) new_instruction = json.loads(rsp)["New Instruction"] - return new_instruction \ No newline at end of file + return new_instruction diff --git a/expo/insights/solution_designer.py b/expo/insights/solution_designer.py index e2bf57ae3..fc05afeea 100644 --- a/expo/insights/solution_designer.py +++ b/expo/insights/solution_designer.py @@ -1,10 +1,7 @@ -import re -import random import json -from metagpt.llm import LLM -from metagpt.schema import Message -from expo.utils import clean_json_from_rsp, load_data_config +from expo.utils import clean_json_from_rsp, load_data_config +from metagpt.llm import LLM DATA_CONFIG = load_data_config() @@ -72,56 +69,50 @@ Make sure each method is independent and can be implemented separately. """ KEY_DATASET_FEATURES = [ - 'NumberOfClasses', - 'NumberOfFeatures', - 'NumberOfInstances', - 'NumberOfInstancesWithMissingValues', - 'NumberOfMissingValues', - 'NumberOfNumericFeatures', - 'NumberOfSymbolicFeatures' + "NumberOfClasses", + "NumberOfFeatures", + "NumberOfInstances", + "NumberOfInstancesWithMissingValues", + "NumberOfMissingValues", + "NumberOfNumericFeatures", + "NumberOfSymbolicFeatures", ] -TASK_TO_ID = { - "EDA": 1, - "Data Preprocessing": 2, - "Feature Engineering": 3, - "Model Training": 4, - "Model Evaluation": 5 -} +TASK_TO_ID = {"EDA": 1, "Data Preprocessing": 2, "Feature Engineering": 3, "Model Training": 4, "Model Evaluation": 5} + class SolutionDesigner: - data_dir : str= DATA_CONFIG["datasets_dir"] + data_dir: str = DATA_CONFIG["datasets_dir"] async def generate_solutions(self, dataset_info, dataset_name): llm = LLM() - context = DATASET_INSIGHT_PROMPT.format(dataset=dataset_info["description"], - metadata=self.metadata_builder(dataset_info["metadata"]), - head=dataset_info["df_head"]) + context = DATASET_INSIGHT_PROMPT.format( + dataset=dataset_info["description"], + metadata=self.metadata_builder(dataset_info["metadata"]), + head=dataset_info["df_head"], + ) rsp = await llm.aask(context) rsp = clean_json_from_rsp(rsp) analysis_pool = self.process_analysis_pool(json.loads(rsp)) dataset_path = f"{self.data_dir}/{dataset_name}" self.save_analysis_pool(dataset_path, analysis_pool) - - + def process_analysis_pool(self, insights_rsp): analysis_pool = [] for task_type_insights in insights_rsp: task_type = task_type_insights["task_type"] for insight in task_type_insights["insights"]: - analysis_pool.append({"Analysis": insight, "Category": task_type, "task_id": TASK_TO_ID[task_type]}) + analysis_pool.append({"Analysis": insight, "Category": task_type, "task_id": TASK_TO_ID[task_type]}) return analysis_pool - def metadata_builder(self, qualities): metadata = {} for key in KEY_DATASET_FEATURES: metadata[key] = qualities.get(key, "N/A") metadata_text = json.dumps(metadata, indent=4) return metadata_text - + def save_analysis_pool(self, dataset_path, analysis_pool): fpath = f"{dataset_path}/ds_analysis_pool.json" with open(fpath, "w") as file: json.dump(analysis_pool, file, indent=4) - \ No newline at end of file diff --git a/expo/research_assistant.py b/expo/research_assistant.py index ed935b4b8..b21fc1a55 100644 --- a/expo/research_assistant.py +++ b/expo/research_assistant.py @@ -1,19 +1,16 @@ from __future__ import annotations import json +import os + +from pydantic import model_validator + +from expo.utils import mcts_logger, save_notebook +from metagpt.actions.di.write_analysis_code import WriteAnalysisCode +from metagpt.const import SERDESER_PATH from metagpt.roles.di.data_interpreter import DataInterpreter from metagpt.schema import Message, Task, TaskResult -from metagpt.strategy.task_type import TaskType -from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender -from metagpt.utils.common import CodeParser -from metagpt.utils.common import write_json_file, read_json_file, format_trackback_info -from metagpt.const import MESSAGE_ROUTE_TO_ALL, SERDESER_PATH -from expo.utils import mcts_logger, save_notebook -from pydantic import Field, model_validator -from metagpt.actions.di.write_analysis_code import CheckData, WriteAnalysisCode - -import re -import os +from metagpt.utils.common import CodeParser, write_json_file EXTRACT_SCORE_PROMPT = """ # Code: @@ -36,39 +33,48 @@ If you cannot find the scores, please still return a dictionary with the keys 't ``` """ + class ResearchAssistant(DataInterpreter): node_id: str = "0" start_task_id: int = 1 - state_saved : bool = False - role_dir : str = SERDESER_PATH.joinpath("team", "environment", "roles", f"Experimenter") + state_saved: bool = False + role_dir: str = SERDESER_PATH.joinpath("team", "environment", "roles", "Experimenter") def get_node_name(self): return f"Node-{self.node_id}" - + def get_next_instruction(self): return self.planner.plan.tasks[self.start_task_id] - + def change_next_instruction(self, new_instruction): if new_instruction is not None: self.planner.plan.task_map[str(self.start_task_id)].instruction = new_instruction self.remap_tasks() - def update_til_start_task(self, role: ResearchAssistant, backward: bool = True): if backward: # make sure the previous task instructions are matched - assert self.start_task_id == role.start_task_id - 1, f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}" + assert ( + self.start_task_id == role.start_task_id - 1 + ), f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}" for i in range(self.start_task_id): - if self.planner.plan.task_map[str(self.start_task_id)].instruction != role.planner.plan.task_map[str(self.start_task_id)].instruction: + if ( + self.planner.plan.task_map[str(self.start_task_id)].instruction + != role.planner.plan.task_map[str(self.start_task_id)].instruction + ): mcts_logger.info("Previous task instructions not matched") self.remap_tasks() return # copy new role's task (self.start_task_id) to current role - self.planner.plan.task_map[str(self.start_task_id)] = role.planner.plan.task_map[str(self.start_task_id)].model_copy() + self.planner.plan.task_map[str(self.start_task_id)] = role.planner.plan.task_map[ + str(self.start_task_id) + ].model_copy() self.remap_tasks() else: - assert self.start_task_id == role.start_task_id + 1, f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}" + assert ( + self.start_task_id == role.start_task_id + 1 + ), f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}" if int(role.planner.plan.current_task_id) > self.start_task_id: for i in range(role.start_task_id): self.planner.plan.task_map[str(i)] = role.planner.plan.task_map[str(i)].model_copy() @@ -86,11 +92,10 @@ class ResearchAssistant(DataInterpreter): json_block = CodeParser.parse_code(block=None, text=rsp) score_dict = json.loads(json_block) return score_dict - @model_validator(mode="after") def set_plan_and_tool(self) -> "Interpreter": - if self.planner.plan.goal != '': + if self.planner.plan.goal != "": self.set_actions([WriteAnalysisCode]) self._set_state(0) print("Plan already exists, skipping initialization.") @@ -116,17 +121,17 @@ class ResearchAssistant(DataInterpreter): self.state_saved = True mcts_logger.log("MCTS", f"Saving state at task {self.start_task_id}") else: - mcts_logger.log("MCTS", f"Static Saving") + mcts_logger.log("MCTS", "Static Saving") stg_path = self.role_dir name = self.get_node_name() role_path = os.path.join(stg_path, f"{name}.json") # 将状态保存为 JSON 文件 write_json_file(role_path, self.model_dump()) - def remap_tasks(self): - self.planner.plan.tasks = [self.planner.plan.task_map[task_id] for task_id in sorted(self.planner.plan.task_map.keys())] - + self.planner.plan.tasks = [ + self.planner.plan.task_map[task_id] for task_id in sorted(self.planner.plan.task_map.keys()) + ] async def run(self, with_message=None) -> Message | None: """Observe, and think and act based on the results of the observation""" @@ -138,13 +143,9 @@ class ResearchAssistant(DataInterpreter): self.rc.working_memory.clear() self.working_memory.clear() # self.rc.todo = WriteAnalysisCode() - rsp = await self.react() + rsp = await self.react() # 发送响应消息给 Environment 对象,以便它将消息传递给订阅者 self.set_todo(None) self.publish_message(rsp) return rsp return await super().run(with_message) - - - - \ No newline at end of file diff --git a/expo/run_exp_augmentation.py b/expo/run_exp_augmentation.py index 3f8eff3b3..7fb174ff7 100644 --- a/expo/run_exp_augmentation.py +++ b/expo/run_exp_augmentation.py @@ -1,15 +1,17 @@ -import os -from expo.research_assistant import ResearchAssistant +import argparse import asyncio -from expo.utils import DATA_CONFIG, get_exp_pool_path +import datetime +import json +import os + +import pandas as pd + from expo.dataset import generate_task_requirement +from expo.evaluation.evaluation import evaluate_score from expo.insights.instruction_generator import InstructionGenerator from expo.MCTS import create_initial_state -from expo.evaluation.evaluation import evaluate_score -import json -import argparse -import pandas as pd -import datetime +from expo.research_assistant import ResearchAssistant +from expo.utils import DATA_CONFIG, get_exp_pool_path EXPS_PROMPT = """ When doing the tasks, you can refer to the insights below: @@ -18,13 +20,14 @@ When doing the tasks, you can refer to the insights below: """ data_config = DATA_CONFIG + def evaluate_test(score, state): datetime_text = datetime.datetime.now().strftime("%Y%m%d%H%M") task_name = state["task"] prediction_fpath = os.path.join(state["work_dir"], task_name, "predictions.csv") predictions = pd.read_csv(prediction_fpath)["target"] # copy predictions.csv to the node_dir - + predictions_node_fpath = os.path.join("results", f"{task_name}-{datetime_text}-predictions.csv") predictions.to_csv(predictions_node_fpath, index=False) # load test_target.csv @@ -35,8 +38,6 @@ def evaluate_test(score, state): return score - - async def main(task_name, use_reflection=True, mode="single", num_experiments=2): """ mode: single or set @@ -44,8 +45,10 @@ async def main(task_name, use_reflection=True, mode="single", num_experiments=2) set: sample a set of instructions """ low_is_better = False - state = create_initial_state(task_name, start_task_id=1, data_config=data_config, low_is_better=low_is_better, name="") - + state = create_initial_state( + task_name, start_task_id=1, data_config=data_config, low_is_better=low_is_better, name="" + ) + user_requirement = generate_task_requirement(task_name, data_config) exp_pool_path = get_exp_pool_path(task_name, data_config, pool_name="ds_analysis_pool") exp_pool = InstructionGenerator.load_analysis_pool(exp_pool_path) @@ -58,7 +61,7 @@ async def main(task_name, use_reflection=True, mode="single", num_experiments=2) exps = [exp_set_text] * num_experiments else: raise ValueError(f"Invalid mode: {mode}") - + scores = [] for i in range(num_experiments): di = ResearchAssistant(node_id=str(i), use_reflection=use_reflection) @@ -70,16 +73,18 @@ async def main(task_name, use_reflection=True, mode="single", num_experiments=2) score = evaluate_test(score, state) scores.append(score) - with open(f"results/{task_name}_scores.json", "w") as f: # save scores and corresponding insights - results = {"avg_score": sum([score["test_score"] for score in scores if score])/num_experiments, - "max_score": max([score["test_score"] for score in scores]), - "scores": scores, "insights": exps} + results = { + "avg_score": sum([score["test_score"] for score in scores if score]) / num_experiments, + "max_score": max([score["test_score"] for score in scores]), + "scores": scores, + "insights": exps, + } json.dump(results, f, indent=4) - - + + def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--task", type=str, default="titanic") @@ -90,8 +95,9 @@ def parse_args(): parser.add_argument("--num_experiments", type=int, default=2) return parser.parse_args() - if __name__ == "__main__": args = parse_args() - asyncio.run(main(args.task, use_reflection=args.use_reflection, mode=args.mode, num_experiments=args.num_experiments)) + asyncio.run( + main(args.task, use_reflection=args.use_reflection, mode=args.mode, num_experiments=args.num_experiments) + ) diff --git a/expo/run_experiment.py b/expo/run_experiment.py index 826019321..f8e58ce4f 100644 --- a/expo/run_experiment.py +++ b/expo/run_experiment.py @@ -1,6 +1,12 @@ -from expo.experimenter import MCTSExperimenter, Experimenter, AugExperimenter, CustomExperimenter -import asyncio import argparse +import asyncio + +from expo.experimenter import ( + AugExperimenter, + CustomExperimenter, + Experimenter, + MCTSExperimenter, +) def get_args(): @@ -19,6 +25,7 @@ def get_mcts_args(parser): parser.set_defaults(load_tree=False) parser.add_argument("--rollouts", type=int, default=5) + def get_aug_exp_args(parser): parser.add_argument("--aug_mode", type=str, default="single", choices=["single", "set"]) parser.add_argument("--num_experiments", type=int, default=1) @@ -31,7 +38,7 @@ def get_di_args(parser): parser.add_argument("--reflection", dest="reflection", action="store_true") parser.add_argument("--no_reflection", dest="reflection", action="store_false") parser.set_defaults(reflection=True) - + async def main(args): if args.exp_mode == "mcts": @@ -46,6 +53,7 @@ async def main(args): raise ValueError(f"Invalid exp_mode: {args.exp_mode}") await experimenter.run_experiment() + if __name__ == "__main__": args = get_args() - asyncio.run(main(args)) \ No newline at end of file + asyncio.run(main(args)) diff --git a/expo/run_mcts.py b/expo/run_mcts.py index 20d4171f7..4577417a9 100644 --- a/expo/run_mcts.py +++ b/expo/run_mcts.py @@ -1,10 +1,9 @@ -from expo.MCTS import MCTS, Node, initialize_di_root_node -from expo.utils import load_data_config -from expo.dataset import generate_task_requirement +import argparse +import asyncio from expo.evaluation.visualize_mcts import get_tree_text -import asyncio -import argparse +from expo.MCTS import MCTS +from expo.utils import load_data_config def get_args(): @@ -35,9 +34,17 @@ if __name__ == "__main__": # asyncio.run(root_node.run_node()) mcts = MCTS(root_node=None, max_depth=5) - best_nodes = asyncio.run(mcts.search(args.task, data_config, - low_is_better=args.low_is_better, load_tree=args.load_tree, - reflection=args.reflection, rollouts=args.rollouts, name=args.name)) + best_nodes = asyncio.run( + mcts.search( + args.task, + data_config, + low_is_better=args.low_is_better, + load_tree=args.load_tree, + reflection=args.reflection, + rollouts=args.rollouts, + name=args.name, + ) + ) best_node = best_nodes["global_best"] dev_best_node = best_nodes["dev_best"] text, num_generated_codes = get_tree_text(mcts.root_node) @@ -49,5 +56,3 @@ if __name__ == "__main__": f.write(f"Best node: {best_node}, score: {best_node.raw_reward}\n") f.write(f"Dev best node: {dev_best_node}, score: {dev_best_node.raw_reward}\n") f.write(text) - - diff --git a/expo/utils.py b/expo/utils.py index 20e3fa7f5..d67ceb5a1 100644 --- a/expo/utils.py +++ b/expo/utils.py @@ -1,50 +1,58 @@ -import yaml -from metagpt.roles.role import Role -from metagpt.actions.di.execute_nb_code import ExecuteNbCode -# from nbclient import NotebookClient -from nbformat.notebooknode import NotebookNode -import nbformat -from pathlib import Path -from loguru import logger as _logger -from datetime import datetime -import sys import os import re +import sys +from datetime import datetime +from pathlib import Path + +import nbformat +import yaml +from loguru import logger as _logger + +# from nbclient import NotebookClient +from nbformat.notebooknode import NotebookNode + +from metagpt.roles.role import Role + def load_data_config(file_path="data.yaml"): - with open(file_path, 'r') as stream: + with open(file_path, "r") as stream: data_config = yaml.safe_load(stream) return data_config + DATA_CONFIG = load_data_config() + def get_mcts_logger(): print_level = "INFO" print_level2 = "MCTS" - logfile_level="MCTS" + logfile_level = "MCTS" name: str = None current_date = datetime.now() formatted_date = current_date.strftime("%Y%m%d") log_name = f"{name}_{formatted_date}" if name else formatted_date # name a log with prefix name _logger.remove() - new_level = _logger.level(logfile_level, color="", no=25) + _logger.level(logfile_level, color="", no=25) _logger.add(sys.stderr, level=print_level) _logger.add(sys.stderr, level=print_level2) _logger.add(Path(DATA_CONFIG["work_dir"]) / DATA_CONFIG["role_dir"] / f"{log_name}.txt", level=logfile_level) _logger.propagate = False return _logger + mcts_logger = get_mcts_logger() def get_exp_pool_path(task_name, data_config, pool_name="analysis_pool"): - datasets_dir = data_config['datasets_dir'] - if task_name in data_config['datasets']: - dataset = data_config['datasets'][task_name] - data_path = os.path.join(datasets_dir, dataset['dataset']) + datasets_dir = data_config["datasets_dir"] + if task_name in data_config["datasets"]: + dataset = data_config["datasets"][task_name] + data_path = os.path.join(datasets_dir, dataset["dataset"]) else: - raise ValueError(f"Dataset {task_name} not found in config file. Available datasets: {data_config['datasets'].keys()}") + raise ValueError( + f"Dataset {task_name} not found in config file. Available datasets: {data_config['datasets'].keys()}" + ) exp_pool_path = os.path.join(data_path, f"{pool_name}.json") return exp_pool_path @@ -60,7 +68,6 @@ def change_plan(role, plan): if not finished: tasks[i].plan = plan return finished - def is_cell_to_delete(cell: NotebookNode) -> bool: @@ -82,12 +89,14 @@ def process_cells(nb: NotebookNode) -> NotebookNode: nb["cells"] = new_cells return nb + def save_notebook(role: Role, save_dir: str = "", name: str = ""): save_dir = Path(save_dir) nb = process_cells(role.execute_code.nb) file_path = save_dir / f"{name}.ipynb" nbformat.write(nb, file_path) + async def load_execute_notebook(role): tasks = role.planner.plan.tasks codes = [task.code for task in tasks if task.code] @@ -99,6 +108,7 @@ async def load_execute_notebook(role): print("Finish executing the loaded notebook") return executor + def clean_json_from_rsp(text): pattern = r"```json(.*?)```" matches = re.findall(pattern, text, re.DOTALL) @@ -106,4 +116,4 @@ def clean_json_from_rsp(text): json_str = "\n".join(matches) return json_str else: - return "" \ No newline at end of file + return ""