Merge branch 'experimenter' into 'expo'

Experimenter

See merge request agents/exp_optimizer!2
This commit is contained in:
林义章 2024-09-04 10:00:56 +00:00
commit 697deb97db
18 changed files with 533 additions and 398 deletions

View file

@ -1,23 +1,28 @@
import random
import math
import os
import pandas as pd
from expo.research_assistant import ResearchAssistant
from expo.insights.instruction_generator import InstructionGenerator
from expo.dataset import get_split_dataset_path, generate_task_requirement
from expo.evaluation.evaluation import evaluate_score
from expo.utils import mcts_logger, load_execute_notebook, get_exp_pool_path
from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender
from metagpt.utils.common import write_json_file, read_json_file, format_trackback_info
import numpy as np
import pickle
import random
import pandas as pd
from expo.dataset import generate_task_requirement, get_split_dataset_path
from expo.evaluation.evaluation import evaluate_score
from expo.insights.instruction_generator import InstructionGenerator
from expo.research_assistant import ResearchAssistant
from expo.utils import get_exp_pool_path, load_execute_notebook, mcts_logger
from metagpt.tools.tool_recommend import ToolRecommender
from metagpt.utils.common import read_json_file
def initialize_di_root_node(task, data_config, low_is_better=False, reflection=True, name=""):
start_task_id = 2
state = create_initial_state(task, start_task_id=start_task_id, data_config=data_config, low_is_better=low_is_better, name=name)
role = ResearchAssistant(node_id="0", start_task_id=start_task_id, use_reflection=reflection, role_dir=state["node_dir"])
return role, Node(parent=None, state=state, action=None, value=0)
state = create_initial_state(
task, start_task_id=start_task_id, data_config=data_config, low_is_better=low_is_better, name=name
)
role = ResearchAssistant(
node_id="0", start_task_id=start_task_id, use_reflection=reflection, role_dir=state["node_dir"]
)
return role, Node(parent=None, state=state, action=None, value=0)
def create_initial_state(task, start_task_id, data_config, low_is_better, name):
@ -36,16 +41,16 @@ def create_initial_state(task, start_task_id, data_config, low_is_better, name):
return initial_state
class Node():
state : dict = {}
action : str = None
value : float = 0
visited : int = 0
children : list = []
normalized_reward : dict = {"train_score": 0, "dev_score": 0, "test_score": 0}
class Node:
state: dict = {}
action: str = None
value: float = 0
visited: int = 0
children: list = []
normalized_reward: dict = {"train_score": 0, "dev_score": 0, "test_score": 0}
parent = None
def __init__(self, parent=None, state = None, action=None, value = 0, max_depth=4, **kwargs):
def __init__(self, parent=None, state=None, action=None, value=0, max_depth=4, **kwargs):
self.state = state
self.action = action
self.value = value
@ -66,14 +71,14 @@ class Node():
def __hash__(self):
return hash(self.id)
def save_node(self):
os.makedirs(self.state["node_dir"], exist_ok=True)
with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), 'wb') as f:
os.makedirs(self.state["node_dir"], exist_ok=True)
with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), "wb") as f:
pickle.dump(self, f)
def load_node(self):
with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), 'rb') as f:
with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), "rb") as f:
return pickle.load(f)
def get_depth(self):
@ -94,14 +99,14 @@ class Node():
def is_terminal(self):
return int(self.state["start_task_id"]) == self.max_depth + 1
def is_fully_expanded(self):
return len(self.children) > 0
def add_child(self, child_node):
self.children.append(child_node)
def update(self, reward:dict, child_node=None):
def update(self, reward: dict, child_node=None):
if child_node is not None:
child_role = child_node.load_role()
role = self.load_role()
@ -117,46 +122,48 @@ class Node():
fname = f"Node-{self.id}.json"
role_path = os.path.join(self.state["node_dir"], fname)
return role_path
def load_role(self):
role_dict = read_json_file(self.get_role_path())
if role_dict.get('tool_recommender') is None:
role_dict['tool_recommender'] = ToolRecommender()
elif isinstance(role_dict.get('tool_recommender', {}).get('tools'), dict):
role_dict['tool_recommender']['tools'] = list(role_dict['tool_recommender']['tools'].keys())
if role_dict.get("tool_recommender") is None:
role_dict["tool_recommender"] = ToolRecommender()
elif isinstance(role_dict.get("tool_recommender", {}).get("tools"), dict):
role_dict["tool_recommender"]["tools"] = list(role_dict["tool_recommender"]["tools"].keys())
role = ResearchAssistant(**role_dict)
if self.parent is not None: # TODO: Check this
if self.parent is not None: # TODO: Check this
parent_role = self.parent.load_role()
role.update_til_start_task(parent_role, backward=False)
role.remap_tasks()
return role
def save_new_role(self, role: ResearchAssistant):
role.node_id = self.id
role.start_task_id = self.state['start_task_id']
role.start_task_id = self.state["start_task_id"]
role.state_saved = False
role.change_next_instruction(self.action)
role.change_next_instruction(self.action)
mcts_logger.log("MCTS", f"Saving new role: {role.node_id}")
role.save_state(static_save=True)
async def expand(self, max_children):
if self.is_fully_expanded():
return
insight_geneartor = InstructionGenerator()
role = self.load_role()
original_instruction = role.get_next_instruction()
insights = await insight_geneartor.generate_new_instructions(task_id=role.start_task_id + 1,
original_instruction=original_instruction,
max_num=max_children,
file_path=self.state["exp_pool_path"])
insights = await insight_geneartor.generate_new_instructions(
task_id=role.start_task_id + 1,
original_instruction=original_instruction,
max_num=max_children,
file_path=self.state["exp_pool_path"],
)
new_state = self.state.copy()
new_state['start_task_id'] += 1
new_state["start_task_id"] += 1
for insight in insights:
new_role = role.model_copy()
node = Node(parent=self, state=new_state, action=insight, value=0)
node.save_new_role(new_role)
self.add_child(node)
# def evaluate_test(self):
# prediction_fpath = os.path.join(self.state["work_dir"], self.state["task"], "predictions.csv")
# predictions = pd.read_csv(prediction_fpath)["target"]
@ -168,7 +175,7 @@ class Node():
# gt = pd.read_csv(os.path.join(split_datasets_dir["test_target"]))["target"]
# metric = self.state["dataset_config"]["metric"]
# return evaluate_score(predictions, gt, metric)
def evaluate_prediction(self, split):
pred_path = os.path.join(self.state["work_dir"], self.state["task"], f"{split}_predictions.csv")
pred_node_path = os.path.join(self.state["node_dir"], f"Node-{self.id}-{split}_predictions.csv")
@ -177,28 +184,38 @@ class Node():
preds.to_csv(pred_node_path, index=False)
gt = pd.read_csv(gt_path)["target"]
metric = self.state["dataset_config"]["metric"]
# remove original predictions.csv
os.remove(pred_path)
return evaluate_score(preds, gt, metric)
def evaluate_simulation(self, score_dict):
scores = {
"dev_score": self.evaluate_prediction("dev"),
"test_score": self.evaluate_prediction("test")
}
scores = {"dev_score": self.evaluate_prediction("dev"), "test_score": self.evaluate_prediction("test")}
score_dict.update(scores)
return score_dict
async def run_node(self, role=None):
if self.is_terminal() and role is not None:
if role.state_saved:
return self.raw_reward
if not role:
role = self.load_role()
await load_execute_notebook(role) # execute previous notebook's code
await role.run(with_message='continue')
else:
await role.run(with_message=self.state['requirement'])
max_retries = 3
num_runs = 1
run_finished = False
while num_runs <= max_retries and not run_finished:
try:
if not role:
role = self.load_role()
await load_execute_notebook(role) # execute previous notebook's code
await role.run(with_message="continue")
else:
await role.run(with_message=self.state["requirement"])
run_finished = True
except Exception as e:
mcts_logger.log("MCTS", f"Error in running the role: {e}")
num_runs += 1
if not run_finished:
mcts_logger.log("MCTS", f"Role {role.node_id} failed to run")
return {"test_score": 0, "dev_score": 0, "score": 0}
score_dict = await role.get_score()
score_dict = self.evaluate_simulation(score_dict)
self.raw_reward = score_dict
@ -208,18 +225,19 @@ class Node():
if score == -1:
return 0
return 1 / (1 + score)
score_dict = {k: normalize_score(v) for k, v in score_dict.items()}
self.normalized_reward = score_dict
return score_dict
class MCTS():
#data_path
root_node : Node = None
children : dict = {}
max_depth : int = 5
c_explore : float = 1.4
c_unvisited : float = 0.8
class MCTS:
# data_path
root_node: Node = None
children: dict = {}
max_depth: int = 5
c_explore: float = 1.4
c_unvisited: float = 0.8
def __init__(self, root_node, max_depth):
self.root_node = root_node
@ -229,34 +247,34 @@ class MCTS():
node = self.best_child()
mcts_logger.log("MCTS", f"Selected node id: {node.id}")
return node
def best_child(self):
def uct(node: Node):
n_visits = node.visited if node.visited else self.c_unvisited
avg_value = node.avg_value() if node.visited else node.value/self.c_unvisited
avg_value = node.avg_value() if node.visited else node.value / self.c_unvisited
return avg_value + self.c_explore * math.sqrt(math.log(node.parent.visited) / n_visits)
if len(self.children) == 0:
return self.root_node
all_children = [child for children in self.children.values() for child in children]
return max(all_children, key=uct)
async def expand(self, node : Node, max_children=4):
async def expand(self, node: Node, max_children=5):
await node.expand(max_children)
if node not in self.children or not self.children[node]:
self.children[node] = node.children
return node.children
async def simulate(self, node : Node, role=None):
async def simulate(self, node: Node, role=None):
"Returns the reward for a random simulation (to completion) of `node`"
mcts_logger.log("MCTS", f"Start simulating node {node.id}:")
while node.children:
node = random.choice(node.children)
reward = await node.run_node(role)
mcts_logger.log("MCTS", f"Simulated node's reward: {reward}")
mcts_logger.log("MCTS", f"Simulated node's reward: {reward}")
return reward
def backpropagate(self, node : Node, reward):
def backpropagate(self, node: Node, reward):
child_node = node
node.update(reward)
node = node.parent
@ -264,34 +282,35 @@ class MCTS():
node.update(reward, child_node)
node, child_node = node.parent, node
def best_path(self, root : Node):
def best_path(self, root: Node):
best_child = root
best_score = 0
def bfs(node : Node, best_score, best_child : Node, split):
def bfs(node: Node, best_score, best_child: Node, split):
assert split in ["test_score", "dev_score"]
if node not in self.children:
return best_score, best_child
for child in self.children[node]:
score = child.normalized_reward[split]
print(child.id, score)
print(child.id, split, score)
if score > best_score:
best_score = score
best_child = child
best_score, best_child = bfs(child, best_score, best_child, split)
return best_score, best_child
_, best_child = bfs(root, best_score, best_child, "test_score")
_, dev_best_child = bfs(root, best_score, best_child, "dev_score")
return {"dev_best": dev_best_child,
"global_best": best_child}
return {"dev_best": dev_best_child, "global_best": best_child}
def get_num_simulations(self):
return self.root_node.visited
async def search(self, task, data_config, name,
rollouts, load_tree=False, low_is_better=False, reflection=False):
role, root = initialize_di_root_node(task, data_config, low_is_better=low_is_better, reflection=reflection, name=name)
async def search(self, task, data_config, name, rollouts, load_tree=False, low_is_better=False, reflection=False):
role, root = initialize_di_root_node(
task, data_config, low_is_better=low_is_better, reflection=reflection, name=name
)
self.root_node = root
tree_loaded = False
if load_tree:
@ -300,14 +319,14 @@ class MCTS():
mcts_logger.log("MCTS", f"Tree loaded: {tree_loaded}")
if not tree_loaded:
rollouts -= 2 # 2 rollouts for the initial tree
rollouts -= 2 # 2 rollouts for the initial tree
if rollouts < 0:
raise ValueError("Rollouts must be greater than 2 if there is no tree to load")
self.children[root] = []
reward = await self.simulate(root, role)
self.backpropagate(root, reward)
children = await self.expand(root)
#目前是随机选择1个后续可以改成多个
# 目前是随机选择1个后续可以改成多个
first_leaf = random.choice(children)
reward = await self.simulate(first_leaf)
self.backpropagate(first_leaf, reward)
@ -325,14 +344,13 @@ class MCTS():
mcts_logger.log("MCTS", f"Terminal node's reward: {reward}")
self.backpropagate(node, reward)
else:
if node.visited > 0:
if node.visited > 0:
children = await self.expand(node)
node = random.choice(children)
reward = await self.simulate(node)
self.backpropagate(node, reward)
return self.best_path(root)
def load_tree(self):
def load_children_node(node):
mcts_logger.log("MCTS", f"Load node {node.id}'s child: {node.children}")
@ -342,14 +360,15 @@ class MCTS():
child.load_node()
self.children[child] = child.children
load_children_node(child)
# Load all pkl files in the node_dir
all_pkl_files = os.listdir(self.root_node.state["node_dir"])
all_pkl_files = [f for f in all_pkl_files if f.endswith(".pkl")]
if os.path.exists(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl")):
with open(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl"), 'rb') as f:
with open(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl"), "rb") as f:
self.root_node = pickle.load(f)
self.children[self.root_node] = self.root_node.children
load_children_node(self.root_node)
if self.children:
return True
return False
return False

View file

@ -35,18 +35,15 @@ ### Budget
### 提示词使用
通过执行`dataset.py`中的`generate_task_requirement`函数获取提示词
- 通过执行`dataset.py`中的`generate_task_requirement`函数获取提示词
- 每一个数据集里有`dataset_info.json`里面的内容需要提供给baselines以保证公平
## 3. Evaluation
运行各个框架运行后框架需要提供Dev和Test的`dev_predictions.csv``test_predictions.csv` column name为target
两种评估方式
1. `evaluation.py` 提供pred和原始的gt1D iterable以及需要使用的metric返回evaluation score
2. 使用`CustomExperimenter`
- 使用`CustomExperimenter`
```
experimenter = CustomExperimenter(task="titanic")
score_dict = experimenter.evaluate_pred_files(dev_pred_path, test_pred_path)
@ -61,12 +58,24 @@ ### AIDE
提供github链接并说明使用的命令以及参数设置
### Autogluon
#### Setup
```
pip install -U pip
pip install -U setuptools wheel
CPU version of pytorch has smaller footprint - see installation instructions in
pytorch documentation - https://pytorch.org/get-started/locally/
pip install torch==2.3.1 torchvision==0.18.1 --index-url https://download.pytorch.org/whl/cpu
pip install autogluon
```
提供github链接并说明使用的命令以及参数设置
### Base DI
For setup, check 5.
- `python run_experiment.py --exp_mode base --task titanic`
- `python run_experiment.py --exp_mode base --task titanic --num_experiments 10`
### DI RandomSearch

View file

@ -1,12 +1,14 @@
import openml
from pathlib import Path
from sklearn.model_selection import train_test_split
import os
import json
import yaml
import pandas as pd
from expo.insights.solution_designer import SolutionDesigner
import asyncio
import json
import os
from pathlib import Path
import openml
import pandas as pd
import yaml
from sklearn.model_selection import train_test_split
from expo.insights.solution_designer import SolutionDesigner
BASE_USER_REQUIREMENT = """\
This is a {datasetname} dataset. Your goal is to predict the target column `{target_col}`.
@ -20,7 +22,7 @@ TASK_PROMPT = """\
**Attention**
1. Please do not leak the target label in any form during training.
2. Dev and Test sets do not have the target column.
3. You should perform transformations on all sets at the same step.
3. You should perform transformations on train, dev, and test sets at the same time (it's a good idea to define functions for this and avoid code repetition).
4. If labels are transformed during training, they should be transformed back to the original format before saving the predictions.
## Saving Dev and Test Predictions
@ -38,9 +40,9 @@ print("Train score:", train_score)
```
# Data dir
training: {train_path}
dev: {dev_path}
testing: {test_path}
training (with labels): {train_path}
dev (without labels): {dev_path}
testing (without labels): {test_path}
# Output dir
{output_dir}
@ -59,14 +61,12 @@ OPENML_DATASET_IDS = [
41980,
42225,
531,
# cls
41143,
31,
42733,
41162,
1067,
# multi cls
40498,
40982,
@ -79,14 +79,15 @@ CUSTOM_DATASETS = [
("04_titanic", "Survived"),
("05_house-prices-advanced-regression-techniques", "SalePrice"),
("06_santander-customer-transaction-prediction", "target"),
("07_icr-identify-age-related-conditions", "Class")
("07_icr-identify-age-related-conditions", "Class"),
]
def get_split_dataset_path(dataset_name, config):
datasets_dir = config['datasets_dir']
if dataset_name in config['datasets']:
dataset = config['datasets'][dataset_name]
data_path = os.path.join(datasets_dir, dataset['dataset'])
datasets_dir = config["datasets_dir"]
if dataset_name in config["datasets"]:
dataset = config["datasets"][dataset_name]
data_path = os.path.join(datasets_dir, dataset["dataset"])
split_datasets = {
"train": os.path.join(data_path, "split_train.csv"),
"dev": os.path.join(data_path, "split_dev.csv"),
@ -98,32 +99,39 @@ def get_split_dataset_path(dataset_name, config):
}
return split_datasets
else:
raise ValueError(f"Dataset {dataset_name} not found in config file. Available datasets: {config['datasets'].keys()}")
raise ValueError(
f"Dataset {dataset_name} not found in config file. Available datasets: {config['datasets'].keys()}"
)
def get_user_requirement(task_name, config):
datasets_dir = config['datasets_dir']
if task_name in config['datasets']:
dataset = config['datasets'][task_name]
data_path = os.path.join(datasets_dir, dataset['dataset'])
user_requirement = dataset['user_requirement']
datasets_dir = config["datasets_dir"]
if task_name in config["datasets"]:
dataset = config["datasets"][task_name]
data_path = os.path.join(datasets_dir, dataset["dataset"])
user_requirement = dataset["user_requirement"]
return data_path, user_requirement
else:
raise ValueError(f"Dataset {task_name} not found in config file. Available datasets: {config['datasets'].keys()}")
raise ValueError(
f"Dataset {task_name} not found in config file. Available datasets: {config['datasets'].keys()}"
)
def save_datasets_dict_to_yaml(datasets_dict):
with open("datasets.yaml", "w") as file:
yaml.dump(datasets_dict, file)
def create_dataset_dict(dataset):
dataset_dict = {
"dataset": dataset.name,
"user_requirement": dataset.create_base_requirement(),
"metric": dataset.get_metric(),
"target_col": dataset.target_col
"target_col": dataset.target_col,
}
return dataset_dict
def generate_task_requirement(task_name, data_config):
user_requirement = get_user_requirement(task_name, data_config)
split_dataset_path = get_split_dataset_path(task_name, data_config)
@ -132,19 +140,23 @@ def generate_task_requirement(task_name, data_config):
test_path = split_dataset_path["test_wo_target"]
work_dir = data_config["work_dir"]
output_dir = f"{work_dir}/{task_name}"
user_requirement = TASK_PROMPT.format(user_requirement=user_requirement,
train_path=train_path, dev_path=dev_path, test_path=test_path,
output_dir=output_dir)
user_requirement = TASK_PROMPT.format(
user_requirement=user_requirement,
train_path=train_path,
dev_path=dev_path,
test_path=test_path,
output_dir=output_dir,
)
print(user_requirement)
return user_requirement
class ExpDataset:
description : str = None
metadata : dict = None
dataset_dir : str = None
target_col : str = None
name : str = None
description: str = None
metadata: dict = None
dataset_dir: str = None
target_col: str = None
name: str = None
def __init__(self, name, dataset_dir, **kwargs):
self.name = name
@ -154,18 +166,23 @@ class ExpDataset:
self.save_dataset(target_col=self.target_col)
def check_dataset_exists(self):
fnames = ["split_train.csv", "split_dev.csv", "split_test.csv",
"split_dev_wo_target.csv", "split_dev_target.csv",
"split_test_wo_target.csv", "split_test_target.csv"]
fnames = [
"split_train.csv",
"split_dev.csv",
"split_test.csv",
"split_dev_wo_target.csv",
"split_dev_target.csv",
"split_test_wo_target.csv",
"split_test_target.csv",
]
for fname in fnames:
if not os.path.exists(Path(self.dataset_dir, self.name, fname)):
return False
return True
def check_datasetinfo_exists(self):
return os.path.exists(Path(self.dataset_dir, self.name, "dataset_info.json"))
def get_raw_dataset(self):
raw_dir = Path(self.dataset_dir, self.name, "raw")
if not os.path.exists(Path(raw_dir, "train.csv")):
@ -173,17 +190,17 @@ class ExpDataset:
else:
df = pd.read_csv(Path(raw_dir, "train.csv"))
return df
def get_dataset_info(self):
raw_df = pd.read_csv(Path(self.dataset_dir, self.name, "raw", "train.csv"))
metadata = {
'NumberOfClasses': raw_df[self.target_col].nunique(),
'NumberOfFeatures': raw_df.shape[1],
'NumberOfInstances': raw_df.shape[0],
'NumberOfInstancesWithMissingValues': int(raw_df.isnull().any(axis=1).sum()),
'NumberOfMissingValues': int(raw_df.isnull().sum().sum()),
'NumberOfNumericFeatures': raw_df.select_dtypes(include=['number']).shape[1],
'NumberOfSymbolicFeatures': raw_df.select_dtypes(include=['object']).shape[1],
"NumberOfClasses": raw_df[self.target_col].nunique(),
"NumberOfFeatures": raw_df.shape[1],
"NumberOfInstances": raw_df.shape[0],
"NumberOfInstancesWithMissingValues": int(raw_df.isnull().any(axis=1).sum()),
"NumberOfMissingValues": int(raw_df.isnull().sum().sum()),
"NumberOfNumericFeatures": raw_df.select_dtypes(include=["number"]).shape[1],
"NumberOfSymbolicFeatures": raw_df.select_dtypes(include=["object"]).shape[1],
}
df_head_text = raw_df.head().to_string(index=False)
@ -193,10 +210,10 @@ class ExpDataset:
"description": "",
"target_col": self.target_col,
"metadata": metadata,
"df_head": df_head_text
"df_head": df_head_text,
}
return dataset_info
def get_metric(self):
dataset_info = self.get_dataset_info()
num_classes = dataset_info["metadata"]["NumberOfClasses"]
@ -216,7 +233,6 @@ class ExpDataset:
return req
def save_dataset(self, target_col):
df = self.get_raw_dataset()
if not self.check_dataset_exists() or self.force_update:
print(f"Saving Dataset {self.name} in {self.dataset_dir}")
@ -249,25 +265,22 @@ class ExpDataset:
def split_and_save(self, df, target_col):
if not target_col:
raise ValueError("Target column not provided")
train, test = train_test_split(df, test_size=1-TRAIN_TEST_SPLIT, random_state=SEED)
train, dev = train_test_split(train, test_size=1-TRAIN_DEV_SPLIT, random_state=SEED)
train, test = train_test_split(df, test_size=1 - TRAIN_TEST_SPLIT, random_state=SEED)
train, dev = train_test_split(train, test_size=1 - TRAIN_DEV_SPLIT, random_state=SEED)
self.save_split_datasets(train, "train")
self.save_split_datasets(dev, "dev", target_col)
self.save_split_datasets(test, "test", target_col)
class OpenMLExpDataset(ExpDataset):
def __init__(self, name, dataset_dir, dataset_id, **kwargs):
self.dataset_id = dataset_id
self.dataset = openml.datasets.get_dataset(self.dataset_id,
download_data=False,
download_qualities=False,
download_features_meta_data=True)
self.dataset = openml.datasets.get_dataset(
self.dataset_id, download_data=False, download_qualities=False, download_features_meta_data=True
)
self.name = self.dataset.name
self.target_col = self.dataset.default_target_attribute
super().__init__(self.name, dataset_dir, target_col=self.target_col, **kwargs)
def get_raw_dataset(self):
dataset = self.dataset
@ -276,7 +289,7 @@ class OpenMLExpDataset(ExpDataset):
os.makedirs(raw_dir, exist_ok=True)
dataset_df.to_csv(Path(raw_dir, "train.csv"), index=False)
return dataset_df
def get_dataset_info(self):
dataset_info = super().get_dataset_info()
dataset = self.dataset
@ -290,12 +303,14 @@ class OpenMLExpDataset(ExpDataset):
# def __init__(self, name, dataset_dir, dataset_name, **kwargs):
# super().__init__(name, dataset_dir, **kwargs)
async def process_dataset(dataset, solution_designer, save_analysis_pool, datasets_dict):
if save_analysis_pool:
asyncio.run(solution_designer.generate_solutions(dataset.get_dataset_info(), dataset.name))
dataset_dict = create_dataset_dict(dataset)
datasets_dict["datasets"][dataset.name] = dataset_dict
if __name__ == "__main__":
datasets_dir = "D:/work/automl/datasets"
force_update = False

View file

@ -1,11 +1,12 @@
from sklearn.metrics import f1_score, accuracy_score, roc_auc_score, mean_squared_error
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, mean_squared_error, roc_auc_score
def evaluate_score(pred, gt, metric):
if metric == "accuracy":
return accuracy_score(gt, pred)
elif metric == "f1":
unique_classes = np.unique(gt)
unique_classes = sorted(list(np.unique(gt)))
if 1 in unique_classes and 0 in unique_classes:
pos_label = 1
else:
@ -20,4 +21,4 @@ def evaluate_score(pred, gt, metric):
elif metric == "log rmse":
return mean_squared_error(np.log1p(gt), np.log1p(pred), squared=False)
else:
raise ValueError(f"Metric {metric} not supported")
raise ValueError(f"Metric {metric} not supported")

View file

@ -1,7 +1,7 @@
from expo.MCTS import Node, MCTS
import textwrap
from expo.MCTS import Node
NODE_TEMPLATE = """\
[Node {id}]
Plans:
@ -11,21 +11,23 @@ Score: {score}, Visits: {num_visits}
"""
def get_role_plans(role):
plans = role.planner.plan.tasks
instruct_plans = [f"{i+1}. {task.instruction}" for i, task in enumerate(plans)]
return instruct_plans
def get_tree_text(node : Node):
def get_tree_text(node: Node):
role_dict = {}
code_set = set()
def load_role(node):
if node.id not in role_dict:
role_dict[node.id] = node.load_role()
return role_dict[node.id]
def visualize_node(node : Node, previous_plans=None):
def visualize_node(node: Node, previous_plans=None):
role = load_role(node)
node_id = node.id
plans = role.planner.plan.tasks
@ -36,7 +38,9 @@ def get_tree_text(node : Node):
simulated = role.state_saved
score = f"avg score: {node.avg_value()}, simulated score: {node.raw_reward}"
num_visits = node.visited
return NODE_TEMPLATE.format(id=node_id, plans=instruct_plans_text, simulated=simulated, score=score, num_visits=num_visits)
return NODE_TEMPLATE.format(
id=node_id, plans=instruct_plans_text, simulated=simulated, score=score, num_visits=num_visits
)
def visualize_tree(node, depth=0, previous_plans=None):
text = ""
@ -46,9 +50,10 @@ def get_tree_text(node : Node):
code_set.update({task.instruction for task in role.planner.plan.tasks})
previous_plans = get_role_plans(role)
for child in node.children:
text += textwrap.indent(visualize_tree(child, depth+1, previous_plans), "\t")
text += textwrap.indent(visualize_tree(child, depth + 1, previous_plans), "\t")
return text
return visualize_tree(node), len(code_set)
num_simulations = node.visited
text = f"Number of simulations: {num_simulations}\n"
text += visualize_tree(node)
return text, len(code_set)

View file

@ -1,4 +0,0 @@
from .experimenter import Experimenter
from .mcts import MCTSExperimenter
from .aug import AugExperimenter
from .custom import CustomExperimenter

View file

@ -1,9 +1,8 @@
from experimenter import Experimenter
from expo.MCTS import create_initial_state
from expo.dataset import generate_task_requirement
from expo.utils import mcts_logger, load_execute_notebook, get_exp_pool_path
from expo.insights.instruction_generator import InstructionGenerator
from expo.research_assistant import ResearchAssistant
from expo.utils import get_exp_pool_path
EXPS_PROMPT = """
When doing the tasks, you can refer to the insights below:
@ -12,14 +11,12 @@ When doing the tasks, you can refer to the insights below:
"""
class AugExperimenter(Experimenter):
result_path : str = "results/aug"
result_path: str = "results/aug"
async def run_experiment(self):
state = create_initial_state(self.args.task, start_task_id=1, data_config=self.data_config, low_is_better=self.args.low_is_better, name="")
user_requirement = state["requirement"]
# state = create_initial_state(self.args.task, start_task_id=1, data_config=self.data_config, low_is_better=self.args.low_is_better, name="")
user_requirement = self.state["requirement"]
exp_pool_path = get_exp_pool_path(self.args.task, self.data_config, pool_name="ds_analysis_pool")
exp_pool = InstructionGenerator.load_analysis_pool(exp_pool_path)
if self.args.aug_mode == "single":
@ -31,30 +28,27 @@ class AugExperimenter(Experimenter):
exps = [exp_set_text] * self.args.num_experiments
else:
raise ValueError(f"Invalid mode: {self.args.aug_mode}")
results = []
for i in range(self.args.num_experiments):
di = ResearchAssistant(node_id=str(i), use_reflection=self.args.reflection)
di.role_dir = f"{di.role_dir}_{self.args.task}"
requirement = user_requirement + EXPS_PROMPT.format(experience=exps[i])
print(requirement)
await di.run(requirement)
score_dict = await di.get_score()
score_dict = self.evaluate(score_dict, state)
results.append({
"idx": i,
"score_dict": score_dict,
"aug_mode": self.args.aug_mode,
"insights" : exps[i],
"user_requirement": requirement,
"args": vars(self.args)
})
score_dict = await self.run_di(di, requirement)
results.append(
{
"idx": i,
"score_dict": score_dict,
"aug_mode": self.args.aug_mode,
"insights": exps[i],
"user_requirement": requirement,
"args": vars(self.args),
}
)
scores = [result["score_dict"]["test_score"] for result in results]
avg_score = sum(scores) / len(scores)
best_score = max(scores) if not self.args.low_is_better else min(scores)
best_score_idx = scores.index(best_score)
results.insert(0, {"avg_score": avg_score, "best_score": best_score, "best_score_idx": best_score_idx})
self.save_result(results)

View file

@ -0,0 +1,33 @@
from autogluon.tabular import TabularDataset, TabularPredictor
from expo.experimenter.custom import CustomExperimenter
class AGRunner:
preset = "best_quality"
time_limit = 500
def __init__(self, datasets):
self.datasets = datasets
def run(self):
train_path = self.datasets["train"]
test_wo_target_path = self.datasets["test_wo_target"]
dev_wo_target_path = self.datasets["dev_wo_target"]
target_col = self.state["dataset_config"]["target_col"]
train_data = TabularDataset(train_path)
test_data = TabularDataset(test_wo_target_path)
dev_data = TabularDataset(dev_wo_target_path)
predictor = TabularPredictor(label=target_col).fit(train_data, presets=self.preset, time_limit=self.time_limit)
test_preds = predictor.predict(test_data)
dev_preds = predictor.predict(dev_data)
return {"test_preds": test_preds, "dev_preds": dev_preds}
class GluonExperimenter(CustomExperimenter):
result_path: str = "results/autogluon"
def __init__(self, args, **kwargs):
super().__init__(args, **kwargs)
self.framework = AGRunner(self.datasets)

View file

@ -1,35 +1,36 @@
from expo.experimenter import Experimenter
from expo.MCTS import create_initial_state
from expo.evaluation.evaluation import evaluate_score
import pandas as pd
import os
import pandas as pd
from expo.evaluation.evaluation import evaluate_score
from expo.experimenter import Experimenter
from expo.MCTS import create_initial_state
class CustomExperimenter(Experimenter):
result_path : str = "results/custom"
result_path: str = "results/custom"
def __init__(self, args, **kwargs):
super().__init__(args, **kwargs)
self.framework = kwargs["framework"] # todo
self.framework = kwargs["framework"] # todo
self.task = kwargs.get("task", self.args.task)
self.low_is_better = kwargs.get("low_is_better", self.args.low_is_better)
self.name = kwargs.get("name", "")
self.result_path = f"results/custom_{self.name}"
self.state = create_initial_state(self.task, start_task_id=1, data_config=self.data_config, low_is_better=self.low_is_better, name=self.name)
async def run_experiment(self):
self.state = create_initial_state(
self.task, start_task_id=1, data_config=self.data_config, low_is_better=self.low_is_better, name=self.name
)
def run_experiment(self):
user_requirement = self.state["requirement"]
preds = await self.framework.run(user_requirement)
preds = self.framework.run(user_requirement)
test_preds = preds["test_preds"]
dev_preds = preds["dev_preds"]
score_dict = {
"dev_score": self.evaluate_predictions(dev_preds, "dev"),
"test_score": self.evaluate_predictions(test_preds, "test")
}
results = {
"score_dict": score_dict,
"user_requirement": user_requirement,
"args": vars(self.args)
"test_score": self.evaluate_predictions(test_preds, "test"),
}
results = {"score_dict": score_dict, "user_requirement": user_requirement, "args": vars(self.args)}
self.save_result(results)
def evaluate_pred_files(self, dev_pred_path, test_pred_path):
@ -37,7 +38,7 @@ class CustomExperimenter(Experimenter):
test_preds = pd.read_csv(test_pred_path)["target"]
score_dict = {
"dev_score": self.evaluate_score(dev_preds, "dev"),
"test_score": self.evaluate_score(test_preds, "test")
"test_score": self.evaluate_score(test_preds, "test"),
}
return score_dict
@ -46,8 +47,7 @@ class CustomExperimenter(Experimenter):
gt_path = os.path.join(self.state["datasets_dir"][f"{split}_target"])
gt = pd.read_csv(gt_path)["target"]
score = evaluate_score(preds, gt, metric)
return score
return score
def load_datasets(self):
train_path = self.state["datasets_dir"]["train"]
@ -57,4 +57,3 @@ class CustomExperimenter(Experimenter):
dev = pd.read_csv(dev_path)
test = pd.read_csv(test_path)
return train, dev, test

View file

@ -1,43 +1,77 @@
from expo.utils import DATA_CONFIG
import os
import pandas as pd
from expo.evaluation.evaluation import evaluate_score
import datetime
import json
import os
import pandas as pd
from expo.evaluation.evaluation import evaluate_score
from expo.MCTS import create_initial_state
from expo.research_assistant import ResearchAssistant
from expo.utils import DATA_CONFIG
class Experimenter:
result_path : str = "results/base"
result_path: str = "results/base"
data_config = DATA_CONFIG
def __init__(self, args, **kwargs):
self.args = args
self.start_time = datetime.datetime.now().strftime("%Y%m%d%H%M")
self.state = create_initial_state(
self.args.task,
start_task_id=1,
data_config=self.data_config,
low_is_better=self.args.low_is_better,
name="",
)
async def run_di(self, di, user_requirement):
max_retries = 3
num_runs = 1
run_finished = False
while num_runs <= max_retries and not run_finished:
try:
await di.run(user_requirement)
score_dict = await di.get_score()
score_dict = self.evaluate(score_dict, self.state)
run_finished = True
except Exception as e:
print(f"Error: {e}")
num_runs += 1
if not run_finished:
score_dict = {"train_score": -1, "dev_score": -1, "test_score": -1, "score": -1}
return score_dict
async def run_experiment(self):
state = create_initial_state(self.args.task, start_task_id=1, data_config=self.data_config, low_is_better=self.args.low_is_better, name="")
state = self.state
user_requirement = state["requirement"]
results = []
for i in range(self.args.num_experiments):
di = ResearchAssistant(node_id="0", use_reflection=self.args.reflection)
await di.run(user_requirement)
score_dict = await di.get_score()
score_dict = self.evaluate(score_dict, state)
results.append({
"idx": i,
"score_dict": score_dict,
"user_requirement": user_requirement,
"args": vars(self.args)
})
scores = [result["score_dict"]["test_score"] for result in results]
avg_score = sum(scores) / len(scores)
best_score = max(scores) if not self.args.low_is_better else min(scores)
best_score_idx = scores.index(best_score)
results.insert(0, {"avg_score": avg_score, "best_score": best_score, "best_score_idx": best_score_idx})
score_dict = await self.run_di(di, user_requirement)
results.append(
{"idx": i, "score_dict": score_dict, "user_requirement": user_requirement, "args": vars(self.args)}
)
self.save_result(results) # save intermediate results
dev_scores = [result["score_dict"]["dev_score"] for result in results]
best_dev_score = max(dev_scores) if not self.args.low_is_better else min(dev_scores)
best_score_idx = dev_scores.index(best_dev_score)
test_scores = [result["score_dict"]["test_score"] for result in results]
avg_score = sum(test_scores) / len(test_scores)
global_best_score = max(test_scores) if not self.args.low_is_better else min(test_scores)
results.insert(
0,
{
"best_dev_score": best_dev_score,
"best_score_idx": best_score_idx,
"best_test_score": test_scores[best_score_idx],
"avg_test_score": avg_score,
"best_score": global_best_score,
},
)
self.save_result(results)
def evaluate_prediction(self, split, state):
@ -49,8 +83,9 @@ class Experimenter:
preds.to_csv(pred_node_path, index=False)
gt = pd.read_csv(gt_path)["target"]
metric = state["dataset_config"]["metric"]
os.remove(pred_path)
return evaluate_score(preds, gt, metric)
def evaluate(self, score_dict, state):
scores = {
"dev_score": self.evaluate_prediction("dev", state),
@ -59,8 +94,15 @@ class Experimenter:
score_dict.update(scores)
return score_dict
def save_result(self, result):
end_time = datetime.datetime.now().strftime("%Y%m%d%H%M")
time_info = {
"start_time": self.start_time,
"end_time": end_time,
"duration (minutes)": float(end_time) - float(self.start_time),
}
result = result.copy()
result.insert(0, time_info)
os.makedirs(self.result_path, exist_ok=True)
with open(f"{self.result_path}/{self.args.exp_mode}-{self.args.task}_{self.start_time}.json", "w") as f:
json.dump(result, f, indent=4)

View file

@ -1,45 +1,47 @@
from expo.experimenter import Experimenter
from expo.dataset import generate_task_requirement
from expo.MCTS import MCTS
from expo.evaluation.visualize_mcts import get_tree_text
from expo.experimenter import Experimenter
from expo.MCTS import MCTS
class MCTSExperimenter(Experimenter):
result_path : str = "results/mcts"
result_path: str = "results/mcts"
async def run_experiment(self):
mcts = MCTS(root_node=None, max_depth=5)
best_nodes = await mcts.search(self.args.task, self.data_config,
low_is_better=self.args.low_is_better,
load_tree=self.args.load_tree,
reflection=self.args.reflection,
rollouts=self.args.rollouts,
name=self.args.name)
best_nodes = await mcts.search(
self.args.task,
self.data_config,
low_is_better=self.args.low_is_better,
load_tree=self.args.load_tree,
reflection=self.args.reflection,
rollouts=self.args.rollouts,
name=self.args.name,
)
best_node = best_nodes["global_best"]
dev_best_node = best_nodes["dev_best"]
text, num_generated_codes = get_tree_text(mcts.root_node)
text += f"Generated {num_generated_codes} unique codes.\n"
text += f"Best node: {best_node}, score: {best_node.raw_reward}\n"
text += f"Dev best node: {dev_best_node}, score: {dev_best_node.raw_reward}\n"
print(text)
if self.args.rollouts > 0:
self.save_tree(text)
self.save_tree(text)
results = {
results = [
{
"best_node": best_node.id,
"best_node_score": best_node.raw_reward,
"dev_best_node": dev_best_node.id,
"dev_best_node_score": dev_best_node.raw_reward,
"num_generated_codes": num_generated_codes,
"user_requirement": best_node.state["requirement"],
"args": vars(self.args)
"tree_text": text,
"args": vars(self.args),
}
self.save_result(results)
]
self.save_result(results)
def save_tree(self, tree_text):
fpath = f"{self.result_path}/{self.args.task}_tree_{self.args.name}.txt"
with open(fpath, "w") as f:
f.write(tree_text)

View file

@ -1,3 +1,10 @@
import json
import random
from expo.utils import clean_json_from_rsp, load_data_config, mcts_logger
from metagpt.llm import LLM
from metagpt.schema import Message
REFLECTION_SYSTEM_MSG = "As a Kaggle grandmaster participating in a competition, you need to analyze your experience and propose evolutionary points that are more likely to improve the performance of baseline code."
CHANGE_INSTRUCTION = """
@ -18,12 +25,6 @@ Rewrite the original instruction according to the insights
```
"""
import re
import random
import json
from metagpt.llm import LLM
from metagpt.schema import Message
from expo.utils import load_data_config, mcts_logger, clean_json_from_rsp
DATA_CONFIG = load_data_config()
@ -31,7 +32,7 @@ class InstructionGenerator:
data_config = DATA_CONFIG
@staticmethod
def load_json_data(json_dir):
def load_json_data(json_dir):
with open(json_dir, "r") as file:
json_data = json.load(file)
return json_data
@ -39,7 +40,7 @@ class InstructionGenerator:
@staticmethod
def _random_sample(analysis, num_samples):
return random.sample(analysis, num_samples)
@staticmethod
def sample_instruction_set(data):
data_dict = {}
@ -52,12 +53,12 @@ class InstructionGenerator:
for task_id in sorted(data_dict.keys()):
instruction_set.append(random.choice(data_dict[task_id]))
return instruction_set
@staticmethod
def format_output(rsp):
rsp_list = []
new_data = []
rsp_list.append(rsp)
new_data = []
rsp_list.append(rsp)
for item in rsp_list:
item_dict = json.loads(item)
data = {
@ -83,21 +84,19 @@ class InstructionGenerator:
new_instructions = []
if len(data) == 0:
mcts_logger.log("MCTS", f"No insights available for task {task_id}")
return [original_instruction] # Return the original instruction if no insights are available
return [original_instruction] # Return the original instruction if no insights are available
for item in data[:max_num]:
insights = item["Analysis"]
new_instruction = await InstructionGenerator.generate_new_instruction(original_instruction, insights)
new_instructions.append(new_instruction)
return new_instructions
@staticmethod
async def generate_new_instruction(original_instruction, insights):
prompt = CHANGE_INSTRUCTION.format(instruction=original_instruction, insights=insights)
llm = LLM()
context = llm.format_msg([Message(content=prompt, role="user")])
llm_response = await llm.aask(
context, system_msgs=[REFLECTION_SYSTEM_MSG]
)
context = llm.format_msg([Message(content=prompt, role="user")])
llm_response = await llm.aask(context, system_msgs=[REFLECTION_SYSTEM_MSG])
rsp = clean_json_from_rsp(llm_response)
new_instruction = json.loads(rsp)["New Instruction"]
return new_instruction
return new_instruction

View file

@ -1,10 +1,7 @@
import re
import random
import json
from metagpt.llm import LLM
from metagpt.schema import Message
from expo.utils import clean_json_from_rsp, load_data_config
from expo.utils import clean_json_from_rsp, load_data_config
from metagpt.llm import LLM
DATA_CONFIG = load_data_config()
@ -72,56 +69,50 @@ Make sure each method is independent and can be implemented separately.
"""
KEY_DATASET_FEATURES = [
'NumberOfClasses',
'NumberOfFeatures',
'NumberOfInstances',
'NumberOfInstancesWithMissingValues',
'NumberOfMissingValues',
'NumberOfNumericFeatures',
'NumberOfSymbolicFeatures'
"NumberOfClasses",
"NumberOfFeatures",
"NumberOfInstances",
"NumberOfInstancesWithMissingValues",
"NumberOfMissingValues",
"NumberOfNumericFeatures",
"NumberOfSymbolicFeatures",
]
TASK_TO_ID = {
"EDA": 1,
"Data Preprocessing": 2,
"Feature Engineering": 3,
"Model Training": 4,
"Model Evaluation": 5
}
TASK_TO_ID = {"EDA": 1, "Data Preprocessing": 2, "Feature Engineering": 3, "Model Training": 4, "Model Evaluation": 5}
class SolutionDesigner:
data_dir : str= DATA_CONFIG["datasets_dir"]
data_dir: str = DATA_CONFIG["datasets_dir"]
async def generate_solutions(self, dataset_info, dataset_name):
llm = LLM()
context = DATASET_INSIGHT_PROMPT.format(dataset=dataset_info["description"],
metadata=self.metadata_builder(dataset_info["metadata"]),
head=dataset_info["df_head"])
context = DATASET_INSIGHT_PROMPT.format(
dataset=dataset_info["description"],
metadata=self.metadata_builder(dataset_info["metadata"]),
head=dataset_info["df_head"],
)
rsp = await llm.aask(context)
rsp = clean_json_from_rsp(rsp)
analysis_pool = self.process_analysis_pool(json.loads(rsp))
dataset_path = f"{self.data_dir}/{dataset_name}"
self.save_analysis_pool(dataset_path, analysis_pool)
def process_analysis_pool(self, insights_rsp):
analysis_pool = []
for task_type_insights in insights_rsp:
task_type = task_type_insights["task_type"]
for insight in task_type_insights["insights"]:
analysis_pool.append({"Analysis": insight, "Category": task_type, "task_id": TASK_TO_ID[task_type]})
analysis_pool.append({"Analysis": insight, "Category": task_type, "task_id": TASK_TO_ID[task_type]})
return analysis_pool
def metadata_builder(self, qualities):
metadata = {}
for key in KEY_DATASET_FEATURES:
metadata[key] = qualities.get(key, "N/A")
metadata_text = json.dumps(metadata, indent=4)
return metadata_text
def save_analysis_pool(self, dataset_path, analysis_pool):
fpath = f"{dataset_path}/ds_analysis_pool.json"
with open(fpath, "w") as file:
json.dump(analysis_pool, file, indent=4)

View file

@ -1,19 +1,16 @@
from __future__ import annotations
import json
import os
from pydantic import model_validator
from expo.utils import mcts_logger, save_notebook
from metagpt.actions.di.write_analysis_code import WriteAnalysisCode
from metagpt.const import SERDESER_PATH
from metagpt.roles.di.data_interpreter import DataInterpreter
from metagpt.schema import Message, Task, TaskResult
from metagpt.strategy.task_type import TaskType
from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender
from metagpt.utils.common import CodeParser
from metagpt.utils.common import write_json_file, read_json_file, format_trackback_info
from metagpt.const import MESSAGE_ROUTE_TO_ALL, SERDESER_PATH
from expo.utils import mcts_logger, save_notebook
from pydantic import Field, model_validator
from metagpt.actions.di.write_analysis_code import CheckData, WriteAnalysisCode
import re
import os
from metagpt.utils.common import CodeParser, write_json_file
EXTRACT_SCORE_PROMPT = """
# Code:
@ -36,39 +33,48 @@ If you cannot find the scores, please still return a dictionary with the keys 't
```
"""
class ResearchAssistant(DataInterpreter):
node_id: str = "0"
start_task_id: int = 1
state_saved : bool = False
role_dir : str = SERDESER_PATH.joinpath("team", "environment", "roles", f"Experimenter")
state_saved: bool = False
role_dir: str = SERDESER_PATH.joinpath("team", "environment", "roles", "Experimenter")
def get_node_name(self):
return f"Node-{self.node_id}"
def get_next_instruction(self):
return self.planner.plan.tasks[self.start_task_id]
def change_next_instruction(self, new_instruction):
if new_instruction is not None:
self.planner.plan.task_map[str(self.start_task_id)].instruction = new_instruction
self.remap_tasks()
def update_til_start_task(self, role: ResearchAssistant, backward: bool = True):
if backward:
# make sure the previous task instructions are matched
assert self.start_task_id == role.start_task_id - 1, f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}"
assert (
self.start_task_id == role.start_task_id - 1
), f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}"
for i in range(self.start_task_id):
if self.planner.plan.task_map[str(self.start_task_id)].instruction != role.planner.plan.task_map[str(self.start_task_id)].instruction:
if (
self.planner.plan.task_map[str(self.start_task_id)].instruction
!= role.planner.plan.task_map[str(self.start_task_id)].instruction
):
mcts_logger.info("Previous task instructions not matched")
self.remap_tasks()
return
# copy new role's task (self.start_task_id) to current role
self.planner.plan.task_map[str(self.start_task_id)] = role.planner.plan.task_map[str(self.start_task_id)].model_copy()
self.planner.plan.task_map[str(self.start_task_id)] = role.planner.plan.task_map[
str(self.start_task_id)
].model_copy()
self.remap_tasks()
else:
assert self.start_task_id == role.start_task_id + 1, f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}"
assert (
self.start_task_id == role.start_task_id + 1
), f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}"
if int(role.planner.plan.current_task_id) > self.start_task_id:
for i in range(role.start_task_id):
self.planner.plan.task_map[str(i)] = role.planner.plan.task_map[str(i)].model_copy()
@ -86,11 +92,10 @@ class ResearchAssistant(DataInterpreter):
json_block = CodeParser.parse_code(block=None, text=rsp)
score_dict = json.loads(json_block)
return score_dict
@model_validator(mode="after")
def set_plan_and_tool(self) -> "Interpreter":
if self.planner.plan.goal != '':
if self.planner.plan.goal != "":
self.set_actions([WriteAnalysisCode])
self._set_state(0)
print("Plan already exists, skipping initialization.")
@ -116,17 +121,17 @@ class ResearchAssistant(DataInterpreter):
self.state_saved = True
mcts_logger.log("MCTS", f"Saving state at task {self.start_task_id}")
else:
mcts_logger.log("MCTS", f"Static Saving")
mcts_logger.log("MCTS", "Static Saving")
stg_path = self.role_dir
name = self.get_node_name()
role_path = os.path.join(stg_path, f"{name}.json")
# 将状态保存为 JSON 文件
write_json_file(role_path, self.model_dump())
def remap_tasks(self):
self.planner.plan.tasks = [self.planner.plan.task_map[task_id] for task_id in sorted(self.planner.plan.task_map.keys())]
self.planner.plan.tasks = [
self.planner.plan.task_map[task_id] for task_id in sorted(self.planner.plan.task_map.keys())
]
async def run(self, with_message=None) -> Message | None:
"""Observe, and think and act based on the results of the observation"""
@ -138,13 +143,9 @@ class ResearchAssistant(DataInterpreter):
self.rc.working_memory.clear()
self.working_memory.clear()
# self.rc.todo = WriteAnalysisCode()
rsp = await self.react()
rsp = await self.react()
# 发送响应消息给 Environment 对象,以便它将消息传递给订阅者
self.set_todo(None)
self.publish_message(rsp)
return rsp
return await super().run(with_message)

View file

@ -1,15 +1,17 @@
import os
from expo.research_assistant import ResearchAssistant
import argparse
import asyncio
from expo.utils import DATA_CONFIG, get_exp_pool_path
import datetime
import json
import os
import pandas as pd
from expo.dataset import generate_task_requirement
from expo.evaluation.evaluation import evaluate_score
from expo.insights.instruction_generator import InstructionGenerator
from expo.MCTS import create_initial_state
from expo.evaluation.evaluation import evaluate_score
import json
import argparse
import pandas as pd
import datetime
from expo.research_assistant import ResearchAssistant
from expo.utils import DATA_CONFIG, get_exp_pool_path
EXPS_PROMPT = """
When doing the tasks, you can refer to the insights below:
@ -18,13 +20,14 @@ When doing the tasks, you can refer to the insights below:
"""
data_config = DATA_CONFIG
def evaluate_test(score, state):
datetime_text = datetime.datetime.now().strftime("%Y%m%d%H%M")
task_name = state["task"]
prediction_fpath = os.path.join(state["work_dir"], task_name, "predictions.csv")
predictions = pd.read_csv(prediction_fpath)["target"]
# copy predictions.csv to the node_dir
predictions_node_fpath = os.path.join("results", f"{task_name}-{datetime_text}-predictions.csv")
predictions.to_csv(predictions_node_fpath, index=False)
# load test_target.csv
@ -35,8 +38,6 @@ def evaluate_test(score, state):
return score
async def main(task_name, use_reflection=True, mode="single", num_experiments=2):
"""
mode: single or set
@ -44,8 +45,10 @@ async def main(task_name, use_reflection=True, mode="single", num_experiments=2)
set: sample a set of instructions
"""
low_is_better = False
state = create_initial_state(task_name, start_task_id=1, data_config=data_config, low_is_better=low_is_better, name="")
state = create_initial_state(
task_name, start_task_id=1, data_config=data_config, low_is_better=low_is_better, name=""
)
user_requirement = generate_task_requirement(task_name, data_config)
exp_pool_path = get_exp_pool_path(task_name, data_config, pool_name="ds_analysis_pool")
exp_pool = InstructionGenerator.load_analysis_pool(exp_pool_path)
@ -58,7 +61,7 @@ async def main(task_name, use_reflection=True, mode="single", num_experiments=2)
exps = [exp_set_text] * num_experiments
else:
raise ValueError(f"Invalid mode: {mode}")
scores = []
for i in range(num_experiments):
di = ResearchAssistant(node_id=str(i), use_reflection=use_reflection)
@ -70,16 +73,18 @@ async def main(task_name, use_reflection=True, mode="single", num_experiments=2)
score = evaluate_test(score, state)
scores.append(score)
with open(f"results/{task_name}_scores.json", "w") as f:
# save scores and corresponding insights
results = {"avg_score": sum([score["test_score"] for score in scores if score])/num_experiments,
"max_score": max([score["test_score"] for score in scores]),
"scores": scores, "insights": exps}
results = {
"avg_score": sum([score["test_score"] for score in scores if score]) / num_experiments,
"max_score": max([score["test_score"] for score in scores]),
"scores": scores,
"insights": exps,
}
json.dump(results, f, indent=4)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--task", type=str, default="titanic")
@ -90,8 +95,9 @@ def parse_args():
parser.add_argument("--num_experiments", type=int, default=2)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
asyncio.run(main(args.task, use_reflection=args.use_reflection, mode=args.mode, num_experiments=args.num_experiments))
asyncio.run(
main(args.task, use_reflection=args.use_reflection, mode=args.mode, num_experiments=args.num_experiments)
)

View file

@ -1,6 +1,12 @@
from expo.experimenter import MCTSExperimenter, Experimenter, AugExperimenter, CustomExperimenter
import asyncio
import argparse
import asyncio
from expo.experimenter import (
AugExperimenter,
CustomExperimenter,
Experimenter,
MCTSExperimenter,
)
def get_args():
@ -19,6 +25,7 @@ def get_mcts_args(parser):
parser.set_defaults(load_tree=False)
parser.add_argument("--rollouts", type=int, default=5)
def get_aug_exp_args(parser):
parser.add_argument("--aug_mode", type=str, default="single", choices=["single", "set"])
parser.add_argument("--num_experiments", type=int, default=1)
@ -31,7 +38,7 @@ def get_di_args(parser):
parser.add_argument("--reflection", dest="reflection", action="store_true")
parser.add_argument("--no_reflection", dest="reflection", action="store_false")
parser.set_defaults(reflection=True)
async def main(args):
if args.exp_mode == "mcts":
@ -46,6 +53,7 @@ async def main(args):
raise ValueError(f"Invalid exp_mode: {args.exp_mode}")
await experimenter.run_experiment()
if __name__ == "__main__":
args = get_args()
asyncio.run(main(args))
asyncio.run(main(args))

View file

@ -1,10 +1,9 @@
from expo.MCTS import MCTS, Node, initialize_di_root_node
from expo.utils import load_data_config
from expo.dataset import generate_task_requirement
import argparse
import asyncio
from expo.evaluation.visualize_mcts import get_tree_text
import asyncio
import argparse
from expo.MCTS import MCTS
from expo.utils import load_data_config
def get_args():
@ -35,9 +34,17 @@ if __name__ == "__main__":
# asyncio.run(root_node.run_node())
mcts = MCTS(root_node=None, max_depth=5)
best_nodes = asyncio.run(mcts.search(args.task, data_config,
low_is_better=args.low_is_better, load_tree=args.load_tree,
reflection=args.reflection, rollouts=args.rollouts, name=args.name))
best_nodes = asyncio.run(
mcts.search(
args.task,
data_config,
low_is_better=args.low_is_better,
load_tree=args.load_tree,
reflection=args.reflection,
rollouts=args.rollouts,
name=args.name,
)
)
best_node = best_nodes["global_best"]
dev_best_node = best_nodes["dev_best"]
text, num_generated_codes = get_tree_text(mcts.root_node)
@ -49,5 +56,3 @@ if __name__ == "__main__":
f.write(f"Best node: {best_node}, score: {best_node.raw_reward}\n")
f.write(f"Dev best node: {dev_best_node}, score: {dev_best_node.raw_reward}\n")
f.write(text)

View file

@ -1,50 +1,58 @@
import yaml
from metagpt.roles.role import Role
from metagpt.actions.di.execute_nb_code import ExecuteNbCode
# from nbclient import NotebookClient
from nbformat.notebooknode import NotebookNode
import nbformat
from pathlib import Path
from loguru import logger as _logger
from datetime import datetime
import sys
import os
import re
import sys
from datetime import datetime
from pathlib import Path
import nbformat
import yaml
from loguru import logger as _logger
# from nbclient import NotebookClient
from nbformat.notebooknode import NotebookNode
from metagpt.roles.role import Role
def load_data_config(file_path="data.yaml"):
with open(file_path, 'r') as stream:
with open(file_path, "r") as stream:
data_config = yaml.safe_load(stream)
return data_config
DATA_CONFIG = load_data_config()
def get_mcts_logger():
print_level = "INFO"
print_level2 = "MCTS"
logfile_level="MCTS"
logfile_level = "MCTS"
name: str = None
current_date = datetime.now()
formatted_date = current_date.strftime("%Y%m%d")
log_name = f"{name}_{formatted_date}" if name else formatted_date # name a log with prefix name
_logger.remove()
new_level = _logger.level(logfile_level, color="<green>", no=25)
_logger.level(logfile_level, color="<green>", no=25)
_logger.add(sys.stderr, level=print_level)
_logger.add(sys.stderr, level=print_level2)
_logger.add(Path(DATA_CONFIG["work_dir"]) / DATA_CONFIG["role_dir"] / f"{log_name}.txt", level=logfile_level)
_logger.propagate = False
return _logger
mcts_logger = get_mcts_logger()
def get_exp_pool_path(task_name, data_config, pool_name="analysis_pool"):
datasets_dir = data_config['datasets_dir']
if task_name in data_config['datasets']:
dataset = data_config['datasets'][task_name]
data_path = os.path.join(datasets_dir, dataset['dataset'])
datasets_dir = data_config["datasets_dir"]
if task_name in data_config["datasets"]:
dataset = data_config["datasets"][task_name]
data_path = os.path.join(datasets_dir, dataset["dataset"])
else:
raise ValueError(f"Dataset {task_name} not found in config file. Available datasets: {data_config['datasets'].keys()}")
raise ValueError(
f"Dataset {task_name} not found in config file. Available datasets: {data_config['datasets'].keys()}"
)
exp_pool_path = os.path.join(data_path, f"{pool_name}.json")
return exp_pool_path
@ -60,7 +68,6 @@ def change_plan(role, plan):
if not finished:
tasks[i].plan = plan
return finished
def is_cell_to_delete(cell: NotebookNode) -> bool:
@ -82,12 +89,14 @@ def process_cells(nb: NotebookNode) -> NotebookNode:
nb["cells"] = new_cells
return nb
def save_notebook(role: Role, save_dir: str = "", name: str = ""):
save_dir = Path(save_dir)
nb = process_cells(role.execute_code.nb)
file_path = save_dir / f"{name}.ipynb"
nbformat.write(nb, file_path)
async def load_execute_notebook(role):
tasks = role.planner.plan.tasks
codes = [task.code for task in tasks if task.code]
@ -99,6 +108,7 @@ async def load_execute_notebook(role):
print("Finish executing the loaded notebook")
return executor
def clean_json_from_rsp(text):
pattern = r"```json(.*?)```"
matches = re.findall(pattern, text, re.DOTALL)
@ -106,4 +116,4 @@ def clean_json_from_rsp(text):
json_str = "\n".join(matches)
return json_str
else:
return ""
return ""