format code

This commit is contained in:
Yizhou Chi 2024-09-04 17:52:02 +08:00
parent fcd1ba66a6
commit ab8a1d6824
17 changed files with 433 additions and 396 deletions

View file

@ -1,23 +1,28 @@
import random
import math
import os
import pandas as pd
from expo.research_assistant import ResearchAssistant
from expo.insights.instruction_generator import InstructionGenerator
from expo.dataset import get_split_dataset_path, generate_task_requirement
from expo.evaluation.evaluation import evaluate_score
from expo.utils import mcts_logger, load_execute_notebook, get_exp_pool_path
from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender
from metagpt.utils.common import write_json_file, read_json_file, format_trackback_info
import numpy as np
import pickle
import random
import pandas as pd
from expo.dataset import generate_task_requirement, get_split_dataset_path
from expo.evaluation.evaluation import evaluate_score
from expo.insights.instruction_generator import InstructionGenerator
from expo.research_assistant import ResearchAssistant
from expo.utils import get_exp_pool_path, load_execute_notebook, mcts_logger
from metagpt.tools.tool_recommend import ToolRecommender
from metagpt.utils.common import read_json_file
def initialize_di_root_node(task, data_config, low_is_better=False, reflection=True, name=""):
start_task_id = 2
state = create_initial_state(task, start_task_id=start_task_id, data_config=data_config, low_is_better=low_is_better, name=name)
role = ResearchAssistant(node_id="0", start_task_id=start_task_id, use_reflection=reflection, role_dir=state["node_dir"])
return role, Node(parent=None, state=state, action=None, value=0)
state = create_initial_state(
task, start_task_id=start_task_id, data_config=data_config, low_is_better=low_is_better, name=name
)
role = ResearchAssistant(
node_id="0", start_task_id=start_task_id, use_reflection=reflection, role_dir=state["node_dir"]
)
return role, Node(parent=None, state=state, action=None, value=0)
def create_initial_state(task, start_task_id, data_config, low_is_better, name):
@ -36,16 +41,16 @@ def create_initial_state(task, start_task_id, data_config, low_is_better, name):
return initial_state
class Node():
state : dict = {}
action : str = None
value : float = 0
visited : int = 0
children : list = []
normalized_reward : dict = {"train_score": 0, "dev_score": 0, "test_score": 0}
class Node:
state: dict = {}
action: str = None
value: float = 0
visited: int = 0
children: list = []
normalized_reward: dict = {"train_score": 0, "dev_score": 0, "test_score": 0}
parent = None
def __init__(self, parent=None, state = None, action=None, value = 0, max_depth=4, **kwargs):
def __init__(self, parent=None, state=None, action=None, value=0, max_depth=4, **kwargs):
self.state = state
self.action = action
self.value = value
@ -66,14 +71,14 @@ class Node():
def __hash__(self):
return hash(self.id)
def save_node(self):
os.makedirs(self.state["node_dir"], exist_ok=True)
with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), 'wb') as f:
os.makedirs(self.state["node_dir"], exist_ok=True)
with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), "wb") as f:
pickle.dump(self, f)
def load_node(self):
with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), 'rb') as f:
with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), "rb") as f:
return pickle.load(f)
def get_depth(self):
@ -94,14 +99,14 @@ class Node():
def is_terminal(self):
return int(self.state["start_task_id"]) == self.max_depth + 1
def is_fully_expanded(self):
return len(self.children) > 0
def add_child(self, child_node):
self.children.append(child_node)
def update(self, reward:dict, child_node=None):
def update(self, reward: dict, child_node=None):
if child_node is not None:
child_role = child_node.load_role()
role = self.load_role()
@ -117,46 +122,48 @@ class Node():
fname = f"Node-{self.id}.json"
role_path = os.path.join(self.state["node_dir"], fname)
return role_path
def load_role(self):
role_dict = read_json_file(self.get_role_path())
if role_dict.get('tool_recommender') is None:
role_dict['tool_recommender'] = ToolRecommender()
elif isinstance(role_dict.get('tool_recommender', {}).get('tools'), dict):
role_dict['tool_recommender']['tools'] = list(role_dict['tool_recommender']['tools'].keys())
if role_dict.get("tool_recommender") is None:
role_dict["tool_recommender"] = ToolRecommender()
elif isinstance(role_dict.get("tool_recommender", {}).get("tools"), dict):
role_dict["tool_recommender"]["tools"] = list(role_dict["tool_recommender"]["tools"].keys())
role = ResearchAssistant(**role_dict)
if self.parent is not None: # TODO: Check this
if self.parent is not None: # TODO: Check this
parent_role = self.parent.load_role()
role.update_til_start_task(parent_role, backward=False)
role.remap_tasks()
return role
def save_new_role(self, role: ResearchAssistant):
role.node_id = self.id
role.start_task_id = self.state['start_task_id']
role.start_task_id = self.state["start_task_id"]
role.state_saved = False
role.change_next_instruction(self.action)
role.change_next_instruction(self.action)
mcts_logger.log("MCTS", f"Saving new role: {role.node_id}")
role.save_state(static_save=True)
async def expand(self, max_children):
if self.is_fully_expanded():
return
insight_geneartor = InstructionGenerator()
role = self.load_role()
original_instruction = role.get_next_instruction()
insights = await insight_geneartor.generate_new_instructions(task_id=role.start_task_id + 1,
original_instruction=original_instruction,
max_num=max_children,
file_path=self.state["exp_pool_path"])
insights = await insight_geneartor.generate_new_instructions(
task_id=role.start_task_id + 1,
original_instruction=original_instruction,
max_num=max_children,
file_path=self.state["exp_pool_path"],
)
new_state = self.state.copy()
new_state['start_task_id'] += 1
new_state["start_task_id"] += 1
for insight in insights:
new_role = role.model_copy()
node = Node(parent=self, state=new_state, action=insight, value=0)
node.save_new_role(new_role)
self.add_child(node)
# def evaluate_test(self):
# prediction_fpath = os.path.join(self.state["work_dir"], self.state["task"], "predictions.csv")
# predictions = pd.read_csv(prediction_fpath)["target"]
@ -168,7 +175,7 @@ class Node():
# gt = pd.read_csv(os.path.join(split_datasets_dir["test_target"]))["target"]
# metric = self.state["dataset_config"]["metric"]
# return evaluate_score(predictions, gt, metric)
def evaluate_prediction(self, split):
pred_path = os.path.join(self.state["work_dir"], self.state["task"], f"{split}_predictions.csv")
pred_node_path = os.path.join(self.state["node_dir"], f"Node-{self.id}-{split}_predictions.csv")
@ -180,21 +187,17 @@ class Node():
# remove original predictions.csv
os.remove(pred_path)
return evaluate_score(preds, gt, metric)
def evaluate_simulation(self, score_dict):
scores = {
"dev_score": self.evaluate_prediction("dev"),
"test_score": self.evaluate_prediction("test")
}
scores = {"dev_score": self.evaluate_prediction("dev"), "test_score": self.evaluate_prediction("test")}
score_dict.update(scores)
return score_dict
async def run_node(self, role=None):
if self.is_terminal() and role is not None:
if role.state_saved:
return self.raw_reward
max_retries = 3
num_runs = 1
run_finished = False
@ -202,10 +205,10 @@ class Node():
try:
if not role:
role = self.load_role()
await load_execute_notebook(role) # execute previous notebook's code
await role.run(with_message='continue')
await load_execute_notebook(role) # execute previous notebook's code
await role.run(with_message="continue")
else:
await role.run(with_message=self.state['requirement'])
await role.run(with_message=self.state["requirement"])
run_finished = True
except Exception as e:
mcts_logger.log("MCTS", f"Error in running the role: {e}")
@ -222,18 +225,19 @@ class Node():
if score == -1:
return 0
return 1 / (1 + score)
score_dict = {k: normalize_score(v) for k, v in score_dict.items()}
self.normalized_reward = score_dict
return score_dict
class MCTS():
#data_path
root_node : Node = None
children : dict = {}
max_depth : int = 5
c_explore : float = 1.4
c_unvisited : float = 0.8
class MCTS:
# data_path
root_node: Node = None
children: dict = {}
max_depth: int = 5
c_explore: float = 1.4
c_unvisited: float = 0.8
def __init__(self, root_node, max_depth):
self.root_node = root_node
@ -243,34 +247,34 @@ class MCTS():
node = self.best_child()
mcts_logger.log("MCTS", f"Selected node id: {node.id}")
return node
def best_child(self):
def uct(node: Node):
n_visits = node.visited if node.visited else self.c_unvisited
avg_value = node.avg_value() if node.visited else node.value/self.c_unvisited
avg_value = node.avg_value() if node.visited else node.value / self.c_unvisited
return avg_value + self.c_explore * math.sqrt(math.log(node.parent.visited) / n_visits)
if len(self.children) == 0:
return self.root_node
all_children = [child for children in self.children.values() for child in children]
return max(all_children, key=uct)
async def expand(self, node : Node, max_children=5):
async def expand(self, node: Node, max_children=5):
await node.expand(max_children)
if node not in self.children or not self.children[node]:
self.children[node] = node.children
return node.children
async def simulate(self, node : Node, role=None):
async def simulate(self, node: Node, role=None):
"Returns the reward for a random simulation (to completion) of `node`"
mcts_logger.log("MCTS", f"Start simulating node {node.id}:")
while node.children:
node = random.choice(node.children)
reward = await node.run_node(role)
mcts_logger.log("MCTS", f"Simulated node's reward: {reward}")
mcts_logger.log("MCTS", f"Simulated node's reward: {reward}")
return reward
def backpropagate(self, node : Node, reward):
def backpropagate(self, node: Node, reward):
child_node = node
node.update(reward)
node = node.parent
@ -278,10 +282,11 @@ class MCTS():
node.update(reward, child_node)
node, child_node = node.parent, node
def best_path(self, root : Node):
def best_path(self, root: Node):
best_child = root
best_score = 0
def bfs(node : Node, best_score, best_child : Node, split):
def bfs(node: Node, best_score, best_child: Node, split):
assert split in ["test_score", "dev_score"]
if node not in self.children:
return best_score, best_child
@ -293,19 +298,19 @@ class MCTS():
best_child = child
best_score, best_child = bfs(child, best_score, best_child, split)
return best_score, best_child
_, best_child = bfs(root, best_score, best_child, "test_score")
_, dev_best_child = bfs(root, best_score, best_child, "dev_score")
return {"dev_best": dev_best_child,
"global_best": best_child}
return {"dev_best": dev_best_child, "global_best": best_child}
def get_num_simulations(self):
return self.root_node.visited
async def search(self, task, data_config, name,
rollouts, load_tree=False, low_is_better=False, reflection=False):
role, root = initialize_di_root_node(task, data_config, low_is_better=low_is_better, reflection=reflection, name=name)
async def search(self, task, data_config, name, rollouts, load_tree=False, low_is_better=False, reflection=False):
role, root = initialize_di_root_node(
task, data_config, low_is_better=low_is_better, reflection=reflection, name=name
)
self.root_node = root
tree_loaded = False
if load_tree:
@ -314,14 +319,14 @@ class MCTS():
mcts_logger.log("MCTS", f"Tree loaded: {tree_loaded}")
if not tree_loaded:
rollouts -= 2 # 2 rollouts for the initial tree
rollouts -= 2 # 2 rollouts for the initial tree
if rollouts < 0:
raise ValueError("Rollouts must be greater than 2 if there is no tree to load")
self.children[root] = []
reward = await self.simulate(root, role)
self.backpropagate(root, reward)
children = await self.expand(root)
#目前是随机选择1个后续可以改成多个
# 目前是随机选择1个后续可以改成多个
first_leaf = random.choice(children)
reward = await self.simulate(first_leaf)
self.backpropagate(first_leaf, reward)
@ -339,14 +344,13 @@ class MCTS():
mcts_logger.log("MCTS", f"Terminal node's reward: {reward}")
self.backpropagate(node, reward)
else:
if node.visited > 0:
if node.visited > 0:
children = await self.expand(node)
node = random.choice(children)
reward = await self.simulate(node)
self.backpropagate(node, reward)
return self.best_path(root)
def load_tree(self):
def load_children_node(node):
mcts_logger.log("MCTS", f"Load node {node.id}'s child: {node.children}")
@ -356,14 +360,15 @@ class MCTS():
child.load_node()
self.children[child] = child.children
load_children_node(child)
# Load all pkl files in the node_dir
all_pkl_files = os.listdir(self.root_node.state["node_dir"])
all_pkl_files = [f for f in all_pkl_files if f.endswith(".pkl")]
if os.path.exists(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl")):
with open(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl"), 'rb') as f:
with open(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl"), "rb") as f:
self.root_node = pickle.load(f)
self.children[self.root_node] = self.root_node.children
load_children_node(self.root_node)
if self.children:
return True
return False
return False

View file

@ -1,12 +1,14 @@
import openml
from pathlib import Path
from sklearn.model_selection import train_test_split
import os
import json
import yaml
import pandas as pd
from expo.insights.solution_designer import SolutionDesigner
import asyncio
import json
import os
from pathlib import Path
import openml
import pandas as pd
import yaml
from sklearn.model_selection import train_test_split
from expo.insights.solution_designer import SolutionDesigner
BASE_USER_REQUIREMENT = """\
This is a {datasetname} dataset. Your goal is to predict the target column `{target_col}`.
@ -59,14 +61,12 @@ OPENML_DATASET_IDS = [
41980,
42225,
531,
# cls
41143,
31,
42733,
41162,
1067,
# multi cls
40498,
40982,
@ -79,14 +79,15 @@ CUSTOM_DATASETS = [
("04_titanic", "Survived"),
("05_house-prices-advanced-regression-techniques", "SalePrice"),
("06_santander-customer-transaction-prediction", "target"),
("07_icr-identify-age-related-conditions", "Class")
("07_icr-identify-age-related-conditions", "Class"),
]
def get_split_dataset_path(dataset_name, config):
datasets_dir = config['datasets_dir']
if dataset_name in config['datasets']:
dataset = config['datasets'][dataset_name]
data_path = os.path.join(datasets_dir, dataset['dataset'])
datasets_dir = config["datasets_dir"]
if dataset_name in config["datasets"]:
dataset = config["datasets"][dataset_name]
data_path = os.path.join(datasets_dir, dataset["dataset"])
split_datasets = {
"train": os.path.join(data_path, "split_train.csv"),
"dev": os.path.join(data_path, "split_dev.csv"),
@ -98,32 +99,39 @@ def get_split_dataset_path(dataset_name, config):
}
return split_datasets
else:
raise ValueError(f"Dataset {dataset_name} not found in config file. Available datasets: {config['datasets'].keys()}")
raise ValueError(
f"Dataset {dataset_name} not found in config file. Available datasets: {config['datasets'].keys()}"
)
def get_user_requirement(task_name, config):
datasets_dir = config['datasets_dir']
if task_name in config['datasets']:
dataset = config['datasets'][task_name]
data_path = os.path.join(datasets_dir, dataset['dataset'])
user_requirement = dataset['user_requirement']
datasets_dir = config["datasets_dir"]
if task_name in config["datasets"]:
dataset = config["datasets"][task_name]
data_path = os.path.join(datasets_dir, dataset["dataset"])
user_requirement = dataset["user_requirement"]
return data_path, user_requirement
else:
raise ValueError(f"Dataset {task_name} not found in config file. Available datasets: {config['datasets'].keys()}")
raise ValueError(
f"Dataset {task_name} not found in config file. Available datasets: {config['datasets'].keys()}"
)
def save_datasets_dict_to_yaml(datasets_dict):
with open("datasets.yaml", "w") as file:
yaml.dump(datasets_dict, file)
def create_dataset_dict(dataset):
dataset_dict = {
"dataset": dataset.name,
"user_requirement": dataset.create_base_requirement(),
"metric": dataset.get_metric(),
"target_col": dataset.target_col
"target_col": dataset.target_col,
}
return dataset_dict
def generate_task_requirement(task_name, data_config):
user_requirement = get_user_requirement(task_name, data_config)
split_dataset_path = get_split_dataset_path(task_name, data_config)
@ -132,19 +140,23 @@ def generate_task_requirement(task_name, data_config):
test_path = split_dataset_path["test_wo_target"]
work_dir = data_config["work_dir"]
output_dir = f"{work_dir}/{task_name}"
user_requirement = TASK_PROMPT.format(user_requirement=user_requirement,
train_path=train_path, dev_path=dev_path, test_path=test_path,
output_dir=output_dir)
user_requirement = TASK_PROMPT.format(
user_requirement=user_requirement,
train_path=train_path,
dev_path=dev_path,
test_path=test_path,
output_dir=output_dir,
)
print(user_requirement)
return user_requirement
class ExpDataset:
description : str = None
metadata : dict = None
dataset_dir : str = None
target_col : str = None
name : str = None
description: str = None
metadata: dict = None
dataset_dir: str = None
target_col: str = None
name: str = None
def __init__(self, name, dataset_dir, **kwargs):
self.name = name
@ -154,18 +166,23 @@ class ExpDataset:
self.save_dataset(target_col=self.target_col)
def check_dataset_exists(self):
fnames = ["split_train.csv", "split_dev.csv", "split_test.csv",
"split_dev_wo_target.csv", "split_dev_target.csv",
"split_test_wo_target.csv", "split_test_target.csv"]
fnames = [
"split_train.csv",
"split_dev.csv",
"split_test.csv",
"split_dev_wo_target.csv",
"split_dev_target.csv",
"split_test_wo_target.csv",
"split_test_target.csv",
]
for fname in fnames:
if not os.path.exists(Path(self.dataset_dir, self.name, fname)):
return False
return True
def check_datasetinfo_exists(self):
return os.path.exists(Path(self.dataset_dir, self.name, "dataset_info.json"))
def get_raw_dataset(self):
raw_dir = Path(self.dataset_dir, self.name, "raw")
if not os.path.exists(Path(raw_dir, "train.csv")):
@ -173,17 +190,17 @@ class ExpDataset:
else:
df = pd.read_csv(Path(raw_dir, "train.csv"))
return df
def get_dataset_info(self):
raw_df = pd.read_csv(Path(self.dataset_dir, self.name, "raw", "train.csv"))
metadata = {
'NumberOfClasses': raw_df[self.target_col].nunique(),
'NumberOfFeatures': raw_df.shape[1],
'NumberOfInstances': raw_df.shape[0],
'NumberOfInstancesWithMissingValues': int(raw_df.isnull().any(axis=1).sum()),
'NumberOfMissingValues': int(raw_df.isnull().sum().sum()),
'NumberOfNumericFeatures': raw_df.select_dtypes(include=['number']).shape[1],
'NumberOfSymbolicFeatures': raw_df.select_dtypes(include=['object']).shape[1],
"NumberOfClasses": raw_df[self.target_col].nunique(),
"NumberOfFeatures": raw_df.shape[1],
"NumberOfInstances": raw_df.shape[0],
"NumberOfInstancesWithMissingValues": int(raw_df.isnull().any(axis=1).sum()),
"NumberOfMissingValues": int(raw_df.isnull().sum().sum()),
"NumberOfNumericFeatures": raw_df.select_dtypes(include=["number"]).shape[1],
"NumberOfSymbolicFeatures": raw_df.select_dtypes(include=["object"]).shape[1],
}
df_head_text = raw_df.head().to_string(index=False)
@ -193,10 +210,10 @@ class ExpDataset:
"description": "",
"target_col": self.target_col,
"metadata": metadata,
"df_head": df_head_text
"df_head": df_head_text,
}
return dataset_info
def get_metric(self):
dataset_info = self.get_dataset_info()
num_classes = dataset_info["metadata"]["NumberOfClasses"]
@ -216,7 +233,6 @@ class ExpDataset:
return req
def save_dataset(self, target_col):
df = self.get_raw_dataset()
if not self.check_dataset_exists() or self.force_update:
print(f"Saving Dataset {self.name} in {self.dataset_dir}")
@ -249,25 +265,22 @@ class ExpDataset:
def split_and_save(self, df, target_col):
if not target_col:
raise ValueError("Target column not provided")
train, test = train_test_split(df, test_size=1-TRAIN_TEST_SPLIT, random_state=SEED)
train, dev = train_test_split(train, test_size=1-TRAIN_DEV_SPLIT, random_state=SEED)
train, test = train_test_split(df, test_size=1 - TRAIN_TEST_SPLIT, random_state=SEED)
train, dev = train_test_split(train, test_size=1 - TRAIN_DEV_SPLIT, random_state=SEED)
self.save_split_datasets(train, "train")
self.save_split_datasets(dev, "dev", target_col)
self.save_split_datasets(test, "test", target_col)
class OpenMLExpDataset(ExpDataset):
def __init__(self, name, dataset_dir, dataset_id, **kwargs):
self.dataset_id = dataset_id
self.dataset = openml.datasets.get_dataset(self.dataset_id,
download_data=False,
download_qualities=False,
download_features_meta_data=True)
self.dataset = openml.datasets.get_dataset(
self.dataset_id, download_data=False, download_qualities=False, download_features_meta_data=True
)
self.name = self.dataset.name
self.target_col = self.dataset.default_target_attribute
super().__init__(self.name, dataset_dir, target_col=self.target_col, **kwargs)
def get_raw_dataset(self):
dataset = self.dataset
@ -276,7 +289,7 @@ class OpenMLExpDataset(ExpDataset):
os.makedirs(raw_dir, exist_ok=True)
dataset_df.to_csv(Path(raw_dir, "train.csv"), index=False)
return dataset_df
def get_dataset_info(self):
dataset_info = super().get_dataset_info()
dataset = self.dataset
@ -290,12 +303,14 @@ class OpenMLExpDataset(ExpDataset):
# def __init__(self, name, dataset_dir, dataset_name, **kwargs):
# super().__init__(name, dataset_dir, **kwargs)
async def process_dataset(dataset, solution_designer, save_analysis_pool, datasets_dict):
if save_analysis_pool:
asyncio.run(solution_designer.generate_solutions(dataset.get_dataset_info(), dataset.name))
dataset_dict = create_dataset_dict(dataset)
datasets_dict["datasets"][dataset.name] = dataset_dict
if __name__ == "__main__":
datasets_dir = "D:/work/automl/datasets"
force_update = False

View file

@ -1,5 +1,6 @@
from sklearn.metrics import f1_score, accuracy_score, roc_auc_score, mean_squared_error
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, mean_squared_error, roc_auc_score
def evaluate_score(pred, gt, metric):
if metric == "accuracy":
@ -20,4 +21,4 @@ def evaluate_score(pred, gt, metric):
elif metric == "log rmse":
return mean_squared_error(np.log1p(gt), np.log1p(pred), squared=False)
else:
raise ValueError(f"Metric {metric} not supported")
raise ValueError(f"Metric {metric} not supported")

View file

@ -1,7 +1,7 @@
from expo.MCTS import Node, MCTS
import textwrap
from expo.MCTS import Node
NODE_TEMPLATE = """\
[Node {id}]
Plans:
@ -11,21 +11,23 @@ Score: {score}, Visits: {num_visits}
"""
def get_role_plans(role):
plans = role.planner.plan.tasks
instruct_plans = [f"{i+1}. {task.instruction}" for i, task in enumerate(plans)]
return instruct_plans
def get_tree_text(node : Node):
def get_tree_text(node: Node):
role_dict = {}
code_set = set()
def load_role(node):
if node.id not in role_dict:
role_dict[node.id] = node.load_role()
return role_dict[node.id]
def visualize_node(node : Node, previous_plans=None):
def visualize_node(node: Node, previous_plans=None):
role = load_role(node)
node_id = node.id
plans = role.planner.plan.tasks
@ -36,7 +38,9 @@ def get_tree_text(node : Node):
simulated = role.state_saved
score = f"avg score: {node.avg_value()}, simulated score: {node.raw_reward}"
num_visits = node.visited
return NODE_TEMPLATE.format(id=node_id, plans=instruct_plans_text, simulated=simulated, score=score, num_visits=num_visits)
return NODE_TEMPLATE.format(
id=node_id, plans=instruct_plans_text, simulated=simulated, score=score, num_visits=num_visits
)
def visualize_tree(node, depth=0, previous_plans=None):
text = ""
@ -46,11 +50,10 @@ def get_tree_text(node : Node):
code_set.update({task.instruction for task in role.planner.plan.tasks})
previous_plans = get_role_plans(role)
for child in node.children:
text += textwrap.indent(visualize_tree(child, depth+1, previous_plans), "\t")
text += textwrap.indent(visualize_tree(child, depth + 1, previous_plans), "\t")
return text
num_simulations = node.visited
text = f"Number of simulations: {num_simulations}\n"
text += visualize_tree(node)
return text, len(code_set)

View file

@ -1,4 +0,0 @@
from .experimenter import Experimenter
from .mcts import MCTSExperimenter
from .aug import AugExperimenter
from .custom import CustomExperimenter

View file

@ -1,9 +1,8 @@
from experimenter import Experimenter
from expo.MCTS import create_initial_state
from expo.dataset import generate_task_requirement
from expo.utils import mcts_logger, load_execute_notebook, get_exp_pool_path
from expo.insights.instruction_generator import InstructionGenerator
from expo.research_assistant import ResearchAssistant
from expo.utils import get_exp_pool_path
EXPS_PROMPT = """
When doing the tasks, you can refer to the insights below:
@ -12,10 +11,8 @@ When doing the tasks, you can refer to the insights below:
"""
class AugExperimenter(Experimenter):
result_path : str = "results/aug"
result_path: str = "results/aug"
async def run_experiment(self):
# state = create_initial_state(self.args.task, start_task_id=1, data_config=self.data_config, low_is_better=self.args.low_is_better, name="")
@ -31,7 +28,7 @@ class AugExperimenter(Experimenter):
exps = [exp_set_text] * self.args.num_experiments
else:
raise ValueError(f"Invalid mode: {self.args.aug_mode}")
results = []
for i in range(self.args.num_experiments):
di = ResearchAssistant(node_id=str(i), use_reflection=self.args.reflection)
@ -39,20 +36,19 @@ class AugExperimenter(Experimenter):
requirement = user_requirement + EXPS_PROMPT.format(experience=exps[i])
print(requirement)
score_dict = await self.run_di(di, requirement)
results.append({
"idx": i,
"score_dict": score_dict,
"aug_mode": self.args.aug_mode,
"insights" : exps[i],
"user_requirement": requirement,
"args": vars(self.args)
})
results.append(
{
"idx": i,
"score_dict": score_dict,
"aug_mode": self.args.aug_mode,
"insights": exps[i],
"user_requirement": requirement,
"args": vars(self.args),
}
)
scores = [result["score_dict"]["test_score"] for result in results]
avg_score = sum(scores) / len(scores)
best_score = max(scores) if not self.args.low_is_better else min(scores)
best_score_idx = scores.index(best_score)
results.insert(0, {"avg_score": avg_score, "best_score": best_score, "best_score_idx": best_score_idx})
self.save_result(results)

View file

@ -1,13 +1,15 @@
from expo.experimenter.custom import CustomExperimenter
from autogluon.tabular import TabularDataset, TabularPredictor
class AGRunner():
from expo.experimenter.custom import CustomExperimenter
class AGRunner:
preset = "best_quality"
time_limit = 500
def __init__(self, datasets):
self.datasets = datasets
def run(self):
train_path = self.datasets["train"]
test_wo_target_path = self.datasets["test_wo_target"]
@ -16,17 +18,16 @@ class AGRunner():
train_data = TabularDataset(train_path)
test_data = TabularDataset(test_wo_target_path)
dev_data = TabularDataset(dev_wo_target_path)
predictor = TabularPredictor(label=target_col).fit(train_data, presets=self.preset, time_limit=self.time_limit)
test_preds = predictor.predict(test_data)
dev_preds = predictor.predict(dev_data)
return {"test_preds": test_preds, "dev_preds": dev_preds}
class GluonExperimenter(CustomExperimenter):
result_path : str = "results/autogluon"
result_path: str = "results/autogluon"
def __init__(self, args, **kwargs):
super().__init__(args, **kwargs)
self.framework = AGRunner(self.datasets)

View file

@ -1,21 +1,26 @@
from expo.experimenter import Experimenter
from expo.MCTS import create_initial_state
from expo.evaluation.evaluation import evaluate_score
import pandas as pd
import os
import pandas as pd
from expo.evaluation.evaluation import evaluate_score
from expo.experimenter import Experimenter
from expo.MCTS import create_initial_state
class CustomExperimenter(Experimenter):
result_path : str = "results/custom"
result_path: str = "results/custom"
def __init__(self, args, **kwargs):
super().__init__(args, **kwargs)
self.framework = kwargs["framework"] # todo
self.framework = kwargs["framework"] # todo
self.task = kwargs.get("task", self.args.task)
self.low_is_better = kwargs.get("low_is_better", self.args.low_is_better)
self.name = kwargs.get("name", "")
self.result_path = f"results/custom_{self.name}"
self.state = create_initial_state(self.task, start_task_id=1, data_config=self.data_config, low_is_better=self.low_is_better, name=self.name)
self.state = create_initial_state(
self.task, start_task_id=1, data_config=self.data_config, low_is_better=self.low_is_better, name=self.name
)
def run_experiment(self):
user_requirement = self.state["requirement"]
preds = self.framework.run(user_requirement)
@ -23,13 +28,9 @@ class CustomExperimenter(Experimenter):
dev_preds = preds["dev_preds"]
score_dict = {
"dev_score": self.evaluate_predictions(dev_preds, "dev"),
"test_score": self.evaluate_predictions(test_preds, "test")
}
results = {
"score_dict": score_dict,
"user_requirement": user_requirement,
"args": vars(self.args)
"test_score": self.evaluate_predictions(test_preds, "test"),
}
results = {"score_dict": score_dict, "user_requirement": user_requirement, "args": vars(self.args)}
self.save_result(results)
def evaluate_pred_files(self, dev_pred_path, test_pred_path):
@ -37,7 +38,7 @@ class CustomExperimenter(Experimenter):
test_preds = pd.read_csv(test_pred_path)["target"]
score_dict = {
"dev_score": self.evaluate_score(dev_preds, "dev"),
"test_score": self.evaluate_score(test_preds, "test")
"test_score": self.evaluate_score(test_preds, "test"),
}
return score_dict
@ -46,8 +47,7 @@ class CustomExperimenter(Experimenter):
gt_path = os.path.join(self.state["datasets_dir"][f"{split}_target"])
gt = pd.read_csv(gt_path)["target"]
score = evaluate_score(preds, gt, metric)
return score
return score
def load_datasets(self):
train_path = self.state["datasets_dir"]["train"]
@ -57,4 +57,3 @@ class CustomExperimenter(Experimenter):
dev = pd.read_csv(dev_path)
test = pd.read_csv(test_path)
return train, dev, test

View file

@ -1,23 +1,29 @@
from expo.utils import DATA_CONFIG
import os
import pandas as pd
from expo.evaluation.evaluation import evaluate_score
import datetime
import json
import os
import pandas as pd
from expo.evaluation.evaluation import evaluate_score
from expo.MCTS import create_initial_state
from expo.research_assistant import ResearchAssistant
from expo.utils import DATA_CONFIG
class Experimenter:
result_path : str = "results/base"
result_path: str = "results/base"
data_config = DATA_CONFIG
def __init__(self, args, **kwargs):
self.args = args
self.start_time = datetime.datetime.now().strftime("%Y%m%d%H%M")
self.state = create_initial_state(self.args.task, start_task_id=1, data_config=self.data_config, low_is_better=self.args.low_is_better, name="")
self.state = create_initial_state(
self.args.task,
start_task_id=1,
data_config=self.data_config,
low_is_better=self.args.low_is_better,
name="",
)
async def run_di(self, di, user_requirement):
max_retries = 3
@ -33,14 +39,8 @@ class Experimenter:
print(f"Error: {e}")
num_runs += 1
if not run_finished:
score_dict = {
"train_score": -1,
"dev_score": -1,
"test_score": -1,
"score": -1
}
score_dict = {"train_score": -1, "dev_score": -1, "test_score": -1, "score": -1}
return score_dict
async def run_experiment(self):
state = self.state
@ -50,28 +50,28 @@ class Experimenter:
for i in range(self.args.num_experiments):
di = ResearchAssistant(node_id="0", use_reflection=self.args.reflection)
score_dict = await self.run_di(di, user_requirement)
results.append({
"idx": i,
"score_dict": score_dict,
"user_requirement": user_requirement,
"args": vars(self.args)
})
self.save_result(results) # save intermediate results
results.append(
{"idx": i, "score_dict": score_dict, "user_requirement": user_requirement, "args": vars(self.args)}
)
self.save_result(results) # save intermediate results
dev_scores = [result["score_dict"]["dev_score"] for result in results]
best_dev_score = max(dev_scores) if not self.args.low_is_better else min(dev_scores)
best_score_idx = dev_scores.index(best_dev_score)
test_scores = [result["score_dict"]["test_score"] for result in results]
avg_score = sum(test_scores) / len(test_scores)
global_best_score = max(test_scores) if not self.args.low_is_better else min(test_scores)
results.insert(0, {
"best_dev_score": best_dev_score,
"best_score_idx": best_score_idx,
"best_test_score": test_scores[best_score_idx],
"avg_test_score": avg_score,
"best_score": global_best_score
})
results.insert(
0,
{
"best_dev_score": best_dev_score,
"best_score_idx": best_score_idx,
"best_test_score": test_scores[best_score_idx],
"avg_test_score": avg_score,
"best_score": global_best_score,
},
)
self.save_result(results)
def evaluate_prediction(self, split, state):
@ -85,7 +85,7 @@ class Experimenter:
metric = state["dataset_config"]["metric"]
os.remove(pred_path)
return evaluate_score(preds, gt, metric)
def evaluate(self, score_dict, state):
scores = {
"dev_score": self.evaluate_prediction("dev", state),
@ -94,13 +94,12 @@ class Experimenter:
score_dict.update(scores)
return score_dict
def save_result(self, result):
end_time = datetime.datetime.now().strftime("%Y%m%d%H%M")
time_info = {
"start_time": self.start_time,
"end_time": end_time,
"duration (minutes)": float(end_time) - float(self.start_time)
"duration (minutes)": float(end_time) - float(self.start_time),
}
result = result.copy()
result.insert(0, time_info)

View file

@ -1,22 +1,25 @@
from expo.experimenter import Experimenter
from expo.dataset import generate_task_requirement
from expo.MCTS import MCTS
from expo.evaluation.visualize_mcts import get_tree_text
from expo.experimenter import Experimenter
from expo.MCTS import MCTS
class MCTSExperimenter(Experimenter):
result_path : str = "results/mcts"
result_path: str = "results/mcts"
async def run_experiment(self):
mcts = MCTS(root_node=None, max_depth=5)
best_nodes = await mcts.search(self.args.task, self.data_config,
low_is_better=self.args.low_is_better,
load_tree=self.args.load_tree,
reflection=self.args.reflection,
rollouts=self.args.rollouts,
name=self.args.name)
best_nodes = await mcts.search(
self.args.task,
self.data_config,
low_is_better=self.args.low_is_better,
load_tree=self.args.load_tree,
reflection=self.args.reflection,
rollouts=self.args.rollouts,
name=self.args.name,
)
best_node = best_nodes["global_best"]
dev_best_node = best_nodes["dev_best"]
text, num_generated_codes = get_tree_text(mcts.root_node)
text += f"Generated {num_generated_codes} unique codes.\n"
text += f"Best node: {best_node}, score: {best_node.raw_reward}\n"
@ -24,22 +27,21 @@ class MCTSExperimenter(Experimenter):
print(text)
self.save_tree(text)
results = [{
"best_node": best_node.id,
"best_node_score": best_node.raw_reward,
"dev_best_node": dev_best_node.id,
"dev_best_node_score": dev_best_node.raw_reward,
"num_generated_codes": num_generated_codes,
"user_requirement": best_node.state["requirement"],
"tree_text": text,
"args": vars(self.args)
}]
results = [
{
"best_node": best_node.id,
"best_node_score": best_node.raw_reward,
"dev_best_node": dev_best_node.id,
"dev_best_node_score": dev_best_node.raw_reward,
"num_generated_codes": num_generated_codes,
"user_requirement": best_node.state["requirement"],
"tree_text": text,
"args": vars(self.args),
}
]
self.save_result(results)
def save_tree(self, tree_text):
fpath = f"{self.result_path}/{self.args.task}_tree_{self.args.name}.txt"
with open(fpath, "w") as f:
f.write(tree_text)

View file

@ -1,3 +1,10 @@
import json
import random
from expo.utils import clean_json_from_rsp, load_data_config, mcts_logger
from metagpt.llm import LLM
from metagpt.schema import Message
REFLECTION_SYSTEM_MSG = "As a Kaggle grandmaster participating in a competition, you need to analyze your experience and propose evolutionary points that are more likely to improve the performance of baseline code."
CHANGE_INSTRUCTION = """
@ -18,12 +25,6 @@ Rewrite the original instruction according to the insights
```
"""
import re
import random
import json
from metagpt.llm import LLM
from metagpt.schema import Message
from expo.utils import load_data_config, mcts_logger, clean_json_from_rsp
DATA_CONFIG = load_data_config()
@ -31,7 +32,7 @@ class InstructionGenerator:
data_config = DATA_CONFIG
@staticmethod
def load_json_data(json_dir):
def load_json_data(json_dir):
with open(json_dir, "r") as file:
json_data = json.load(file)
return json_data
@ -39,7 +40,7 @@ class InstructionGenerator:
@staticmethod
def _random_sample(analysis, num_samples):
return random.sample(analysis, num_samples)
@staticmethod
def sample_instruction_set(data):
data_dict = {}
@ -52,12 +53,12 @@ class InstructionGenerator:
for task_id in sorted(data_dict.keys()):
instruction_set.append(random.choice(data_dict[task_id]))
return instruction_set
@staticmethod
def format_output(rsp):
rsp_list = []
new_data = []
rsp_list.append(rsp)
new_data = []
rsp_list.append(rsp)
for item in rsp_list:
item_dict = json.loads(item)
data = {
@ -83,21 +84,19 @@ class InstructionGenerator:
new_instructions = []
if len(data) == 0:
mcts_logger.log("MCTS", f"No insights available for task {task_id}")
return [original_instruction] # Return the original instruction if no insights are available
return [original_instruction] # Return the original instruction if no insights are available
for item in data[:max_num]:
insights = item["Analysis"]
new_instruction = await InstructionGenerator.generate_new_instruction(original_instruction, insights)
new_instructions.append(new_instruction)
return new_instructions
@staticmethod
async def generate_new_instruction(original_instruction, insights):
prompt = CHANGE_INSTRUCTION.format(instruction=original_instruction, insights=insights)
llm = LLM()
context = llm.format_msg([Message(content=prompt, role="user")])
llm_response = await llm.aask(
context, system_msgs=[REFLECTION_SYSTEM_MSG]
)
context = llm.format_msg([Message(content=prompt, role="user")])
llm_response = await llm.aask(context, system_msgs=[REFLECTION_SYSTEM_MSG])
rsp = clean_json_from_rsp(llm_response)
new_instruction = json.loads(rsp)["New Instruction"]
return new_instruction
return new_instruction

View file

@ -1,10 +1,7 @@
import re
import random
import json
from metagpt.llm import LLM
from metagpt.schema import Message
from expo.utils import clean_json_from_rsp, load_data_config
from expo.utils import clean_json_from_rsp, load_data_config
from metagpt.llm import LLM
DATA_CONFIG = load_data_config()
@ -72,56 +69,50 @@ Make sure each method is independent and can be implemented separately.
"""
KEY_DATASET_FEATURES = [
'NumberOfClasses',
'NumberOfFeatures',
'NumberOfInstances',
'NumberOfInstancesWithMissingValues',
'NumberOfMissingValues',
'NumberOfNumericFeatures',
'NumberOfSymbolicFeatures'
"NumberOfClasses",
"NumberOfFeatures",
"NumberOfInstances",
"NumberOfInstancesWithMissingValues",
"NumberOfMissingValues",
"NumberOfNumericFeatures",
"NumberOfSymbolicFeatures",
]
TASK_TO_ID = {
"EDA": 1,
"Data Preprocessing": 2,
"Feature Engineering": 3,
"Model Training": 4,
"Model Evaluation": 5
}
TASK_TO_ID = {"EDA": 1, "Data Preprocessing": 2, "Feature Engineering": 3, "Model Training": 4, "Model Evaluation": 5}
class SolutionDesigner:
data_dir : str= DATA_CONFIG["datasets_dir"]
data_dir: str = DATA_CONFIG["datasets_dir"]
async def generate_solutions(self, dataset_info, dataset_name):
llm = LLM()
context = DATASET_INSIGHT_PROMPT.format(dataset=dataset_info["description"],
metadata=self.metadata_builder(dataset_info["metadata"]),
head=dataset_info["df_head"])
context = DATASET_INSIGHT_PROMPT.format(
dataset=dataset_info["description"],
metadata=self.metadata_builder(dataset_info["metadata"]),
head=dataset_info["df_head"],
)
rsp = await llm.aask(context)
rsp = clean_json_from_rsp(rsp)
analysis_pool = self.process_analysis_pool(json.loads(rsp))
dataset_path = f"{self.data_dir}/{dataset_name}"
self.save_analysis_pool(dataset_path, analysis_pool)
def process_analysis_pool(self, insights_rsp):
analysis_pool = []
for task_type_insights in insights_rsp:
task_type = task_type_insights["task_type"]
for insight in task_type_insights["insights"]:
analysis_pool.append({"Analysis": insight, "Category": task_type, "task_id": TASK_TO_ID[task_type]})
analysis_pool.append({"Analysis": insight, "Category": task_type, "task_id": TASK_TO_ID[task_type]})
return analysis_pool
def metadata_builder(self, qualities):
metadata = {}
for key in KEY_DATASET_FEATURES:
metadata[key] = qualities.get(key, "N/A")
metadata_text = json.dumps(metadata, indent=4)
return metadata_text
def save_analysis_pool(self, dataset_path, analysis_pool):
fpath = f"{dataset_path}/ds_analysis_pool.json"
with open(fpath, "w") as file:
json.dump(analysis_pool, file, indent=4)

View file

@ -1,19 +1,16 @@
from __future__ import annotations
import json
import os
from pydantic import model_validator
from expo.utils import mcts_logger, save_notebook
from metagpt.actions.di.write_analysis_code import WriteAnalysisCode
from metagpt.const import SERDESER_PATH
from metagpt.roles.di.data_interpreter import DataInterpreter
from metagpt.schema import Message, Task, TaskResult
from metagpt.strategy.task_type import TaskType
from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender
from metagpt.utils.common import CodeParser
from metagpt.utils.common import write_json_file, read_json_file, format_trackback_info
from metagpt.const import MESSAGE_ROUTE_TO_ALL, SERDESER_PATH
from expo.utils import mcts_logger, save_notebook
from pydantic import Field, model_validator
from metagpt.actions.di.write_analysis_code import CheckData, WriteAnalysisCode
import re
import os
from metagpt.utils.common import CodeParser, write_json_file
EXTRACT_SCORE_PROMPT = """
# Code:
@ -36,39 +33,48 @@ If you cannot find the scores, please still return a dictionary with the keys 't
```
"""
class ResearchAssistant(DataInterpreter):
node_id: str = "0"
start_task_id: int = 1
state_saved : bool = False
role_dir : str = SERDESER_PATH.joinpath("team", "environment", "roles", f"Experimenter")
state_saved: bool = False
role_dir: str = SERDESER_PATH.joinpath("team", "environment", "roles", "Experimenter")
def get_node_name(self):
return f"Node-{self.node_id}"
def get_next_instruction(self):
return self.planner.plan.tasks[self.start_task_id]
def change_next_instruction(self, new_instruction):
if new_instruction is not None:
self.planner.plan.task_map[str(self.start_task_id)].instruction = new_instruction
self.remap_tasks()
def update_til_start_task(self, role: ResearchAssistant, backward: bool = True):
if backward:
# make sure the previous task instructions are matched
assert self.start_task_id == role.start_task_id - 1, f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}"
assert (
self.start_task_id == role.start_task_id - 1
), f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}"
for i in range(self.start_task_id):
if self.planner.plan.task_map[str(self.start_task_id)].instruction != role.planner.plan.task_map[str(self.start_task_id)].instruction:
if (
self.planner.plan.task_map[str(self.start_task_id)].instruction
!= role.planner.plan.task_map[str(self.start_task_id)].instruction
):
mcts_logger.info("Previous task instructions not matched")
self.remap_tasks()
return
# copy new role's task (self.start_task_id) to current role
self.planner.plan.task_map[str(self.start_task_id)] = role.planner.plan.task_map[str(self.start_task_id)].model_copy()
self.planner.plan.task_map[str(self.start_task_id)] = role.planner.plan.task_map[
str(self.start_task_id)
].model_copy()
self.remap_tasks()
else:
assert self.start_task_id == role.start_task_id + 1, f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}"
assert (
self.start_task_id == role.start_task_id + 1
), f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}"
if int(role.planner.plan.current_task_id) > self.start_task_id:
for i in range(role.start_task_id):
self.planner.plan.task_map[str(i)] = role.planner.plan.task_map[str(i)].model_copy()
@ -86,11 +92,10 @@ class ResearchAssistant(DataInterpreter):
json_block = CodeParser.parse_code(block=None, text=rsp)
score_dict = json.loads(json_block)
return score_dict
@model_validator(mode="after")
def set_plan_and_tool(self) -> "Interpreter":
if self.planner.plan.goal != '':
if self.planner.plan.goal != "":
self.set_actions([WriteAnalysisCode])
self._set_state(0)
print("Plan already exists, skipping initialization.")
@ -116,17 +121,17 @@ class ResearchAssistant(DataInterpreter):
self.state_saved = True
mcts_logger.log("MCTS", f"Saving state at task {self.start_task_id}")
else:
mcts_logger.log("MCTS", f"Static Saving")
mcts_logger.log("MCTS", "Static Saving")
stg_path = self.role_dir
name = self.get_node_name()
role_path = os.path.join(stg_path, f"{name}.json")
# 将状态保存为 JSON 文件
write_json_file(role_path, self.model_dump())
def remap_tasks(self):
self.planner.plan.tasks = [self.planner.plan.task_map[task_id] for task_id in sorted(self.planner.plan.task_map.keys())]
self.planner.plan.tasks = [
self.planner.plan.task_map[task_id] for task_id in sorted(self.planner.plan.task_map.keys())
]
async def run(self, with_message=None) -> Message | None:
"""Observe, and think and act based on the results of the observation"""
@ -138,13 +143,9 @@ class ResearchAssistant(DataInterpreter):
self.rc.working_memory.clear()
self.working_memory.clear()
# self.rc.todo = WriteAnalysisCode()
rsp = await self.react()
rsp = await self.react()
# 发送响应消息给 Environment 对象,以便它将消息传递给订阅者
self.set_todo(None)
self.publish_message(rsp)
return rsp
return await super().run(with_message)

View file

@ -1,15 +1,17 @@
import os
from expo.research_assistant import ResearchAssistant
import argparse
import asyncio
from expo.utils import DATA_CONFIG, get_exp_pool_path
import datetime
import json
import os
import pandas as pd
from expo.dataset import generate_task_requirement
from expo.evaluation.evaluation import evaluate_score
from expo.insights.instruction_generator import InstructionGenerator
from expo.MCTS import create_initial_state
from expo.evaluation.evaluation import evaluate_score
import json
import argparse
import pandas as pd
import datetime
from expo.research_assistant import ResearchAssistant
from expo.utils import DATA_CONFIG, get_exp_pool_path
EXPS_PROMPT = """
When doing the tasks, you can refer to the insights below:
@ -18,13 +20,14 @@ When doing the tasks, you can refer to the insights below:
"""
data_config = DATA_CONFIG
def evaluate_test(score, state):
datetime_text = datetime.datetime.now().strftime("%Y%m%d%H%M")
task_name = state["task"]
prediction_fpath = os.path.join(state["work_dir"], task_name, "predictions.csv")
predictions = pd.read_csv(prediction_fpath)["target"]
# copy predictions.csv to the node_dir
predictions_node_fpath = os.path.join("results", f"{task_name}-{datetime_text}-predictions.csv")
predictions.to_csv(predictions_node_fpath, index=False)
# load test_target.csv
@ -35,8 +38,6 @@ def evaluate_test(score, state):
return score
async def main(task_name, use_reflection=True, mode="single", num_experiments=2):
"""
mode: single or set
@ -44,8 +45,10 @@ async def main(task_name, use_reflection=True, mode="single", num_experiments=2)
set: sample a set of instructions
"""
low_is_better = False
state = create_initial_state(task_name, start_task_id=1, data_config=data_config, low_is_better=low_is_better, name="")
state = create_initial_state(
task_name, start_task_id=1, data_config=data_config, low_is_better=low_is_better, name=""
)
user_requirement = generate_task_requirement(task_name, data_config)
exp_pool_path = get_exp_pool_path(task_name, data_config, pool_name="ds_analysis_pool")
exp_pool = InstructionGenerator.load_analysis_pool(exp_pool_path)
@ -58,7 +61,7 @@ async def main(task_name, use_reflection=True, mode="single", num_experiments=2)
exps = [exp_set_text] * num_experiments
else:
raise ValueError(f"Invalid mode: {mode}")
scores = []
for i in range(num_experiments):
di = ResearchAssistant(node_id=str(i), use_reflection=use_reflection)
@ -70,16 +73,18 @@ async def main(task_name, use_reflection=True, mode="single", num_experiments=2)
score = evaluate_test(score, state)
scores.append(score)
with open(f"results/{task_name}_scores.json", "w") as f:
# save scores and corresponding insights
results = {"avg_score": sum([score["test_score"] for score in scores if score])/num_experiments,
"max_score": max([score["test_score"] for score in scores]),
"scores": scores, "insights": exps}
results = {
"avg_score": sum([score["test_score"] for score in scores if score]) / num_experiments,
"max_score": max([score["test_score"] for score in scores]),
"scores": scores,
"insights": exps,
}
json.dump(results, f, indent=4)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--task", type=str, default="titanic")
@ -90,8 +95,9 @@ def parse_args():
parser.add_argument("--num_experiments", type=int, default=2)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
asyncio.run(main(args.task, use_reflection=args.use_reflection, mode=args.mode, num_experiments=args.num_experiments))
asyncio.run(
main(args.task, use_reflection=args.use_reflection, mode=args.mode, num_experiments=args.num_experiments)
)

View file

@ -1,6 +1,12 @@
from expo.experimenter import MCTSExperimenter, Experimenter, AugExperimenter, CustomExperimenter
import asyncio
import argparse
import asyncio
from expo.experimenter import (
AugExperimenter,
CustomExperimenter,
Experimenter,
MCTSExperimenter,
)
def get_args():
@ -19,6 +25,7 @@ def get_mcts_args(parser):
parser.set_defaults(load_tree=False)
parser.add_argument("--rollouts", type=int, default=5)
def get_aug_exp_args(parser):
parser.add_argument("--aug_mode", type=str, default="single", choices=["single", "set"])
parser.add_argument("--num_experiments", type=int, default=1)
@ -31,7 +38,7 @@ def get_di_args(parser):
parser.add_argument("--reflection", dest="reflection", action="store_true")
parser.add_argument("--no_reflection", dest="reflection", action="store_false")
parser.set_defaults(reflection=True)
async def main(args):
if args.exp_mode == "mcts":
@ -46,6 +53,7 @@ async def main(args):
raise ValueError(f"Invalid exp_mode: {args.exp_mode}")
await experimenter.run_experiment()
if __name__ == "__main__":
args = get_args()
asyncio.run(main(args))
asyncio.run(main(args))

View file

@ -1,10 +1,9 @@
from expo.MCTS import MCTS, Node, initialize_di_root_node
from expo.utils import load_data_config
from expo.dataset import generate_task_requirement
import argparse
import asyncio
from expo.evaluation.visualize_mcts import get_tree_text
import asyncio
import argparse
from expo.MCTS import MCTS
from expo.utils import load_data_config
def get_args():
@ -35,9 +34,17 @@ if __name__ == "__main__":
# asyncio.run(root_node.run_node())
mcts = MCTS(root_node=None, max_depth=5)
best_nodes = asyncio.run(mcts.search(args.task, data_config,
low_is_better=args.low_is_better, load_tree=args.load_tree,
reflection=args.reflection, rollouts=args.rollouts, name=args.name))
best_nodes = asyncio.run(
mcts.search(
args.task,
data_config,
low_is_better=args.low_is_better,
load_tree=args.load_tree,
reflection=args.reflection,
rollouts=args.rollouts,
name=args.name,
)
)
best_node = best_nodes["global_best"]
dev_best_node = best_nodes["dev_best"]
text, num_generated_codes = get_tree_text(mcts.root_node)
@ -49,5 +56,3 @@ if __name__ == "__main__":
f.write(f"Best node: {best_node}, score: {best_node.raw_reward}\n")
f.write(f"Dev best node: {dev_best_node}, score: {dev_best_node.raw_reward}\n")
f.write(text)

View file

@ -1,50 +1,58 @@
import yaml
from metagpt.roles.role import Role
from metagpt.actions.di.execute_nb_code import ExecuteNbCode
# from nbclient import NotebookClient
from nbformat.notebooknode import NotebookNode
import nbformat
from pathlib import Path
from loguru import logger as _logger
from datetime import datetime
import sys
import os
import re
import sys
from datetime import datetime
from pathlib import Path
import nbformat
import yaml
from loguru import logger as _logger
# from nbclient import NotebookClient
from nbformat.notebooknode import NotebookNode
from metagpt.roles.role import Role
def load_data_config(file_path="data.yaml"):
with open(file_path, 'r') as stream:
with open(file_path, "r") as stream:
data_config = yaml.safe_load(stream)
return data_config
DATA_CONFIG = load_data_config()
def get_mcts_logger():
print_level = "INFO"
print_level2 = "MCTS"
logfile_level="MCTS"
logfile_level = "MCTS"
name: str = None
current_date = datetime.now()
formatted_date = current_date.strftime("%Y%m%d")
log_name = f"{name}_{formatted_date}" if name else formatted_date # name a log with prefix name
_logger.remove()
new_level = _logger.level(logfile_level, color="<green>", no=25)
_logger.level(logfile_level, color="<green>", no=25)
_logger.add(sys.stderr, level=print_level)
_logger.add(sys.stderr, level=print_level2)
_logger.add(Path(DATA_CONFIG["work_dir"]) / DATA_CONFIG["role_dir"] / f"{log_name}.txt", level=logfile_level)
_logger.propagate = False
return _logger
mcts_logger = get_mcts_logger()
def get_exp_pool_path(task_name, data_config, pool_name="analysis_pool"):
datasets_dir = data_config['datasets_dir']
if task_name in data_config['datasets']:
dataset = data_config['datasets'][task_name]
data_path = os.path.join(datasets_dir, dataset['dataset'])
datasets_dir = data_config["datasets_dir"]
if task_name in data_config["datasets"]:
dataset = data_config["datasets"][task_name]
data_path = os.path.join(datasets_dir, dataset["dataset"])
else:
raise ValueError(f"Dataset {task_name} not found in config file. Available datasets: {data_config['datasets'].keys()}")
raise ValueError(
f"Dataset {task_name} not found in config file. Available datasets: {data_config['datasets'].keys()}"
)
exp_pool_path = os.path.join(data_path, f"{pool_name}.json")
return exp_pool_path
@ -60,7 +68,6 @@ def change_plan(role, plan):
if not finished:
tasks[i].plan = plan
return finished
def is_cell_to_delete(cell: NotebookNode) -> bool:
@ -82,12 +89,14 @@ def process_cells(nb: NotebookNode) -> NotebookNode:
nb["cells"] = new_cells
return nb
def save_notebook(role: Role, save_dir: str = "", name: str = ""):
save_dir = Path(save_dir)
nb = process_cells(role.execute_code.nb)
file_path = save_dir / f"{name}.ipynb"
nbformat.write(nb, file_path)
async def load_execute_notebook(role):
tasks = role.planner.plan.tasks
codes = [task.code for task in tasks if task.code]
@ -99,6 +108,7 @@ async def load_execute_notebook(role):
print("Finish executing the loaded notebook")
return executor
def clean_json_from_rsp(text):
pattern = r"```json(.*?)```"
matches = re.findall(pattern, text, re.DOTALL)
@ -106,4 +116,4 @@ def clean_json_from_rsp(text):
json_str = "\n".join(matches)
return json_str
else:
return ""
return ""