Merge branch 'experimenter' into 'expo'

Experimenter Update

See merge request agents/exp_optimizer!5
This commit is contained in:
林义章 2024-09-09 06:13:07 +00:00
commit e3fccce73d
13 changed files with 205 additions and 78 deletions

9
expo/Greedy.py Normal file
View file

@ -0,0 +1,9 @@
from expo.MCTS import MCTS
class Greedy(MCTS):
def best_child(self):
if len(self.children) == 0:
return self.root_node
all_children = [child for children in self.children.values() for child in children]
return max(all_children, key=lambda x: x.normalized_reward.get("dev_score", 0))

View file

@ -6,7 +6,7 @@ import random
import numpy as np
import pandas as pd
from expo.dataset import generate_task_requirement, get_split_dataset_path
from expo.data.dataset import generate_task_requirement, get_split_dataset_path
from expo.evaluation.evaluation import evaluate_score
from expo.insights.instruction_generator import InstructionGenerator
from expo.research_assistant import ResearchAssistant
@ -143,6 +143,7 @@ class Node:
role.state_saved = False
role.change_next_instruction(self.action)
mcts_logger.log("MCTS", f"Saving new role: {role.node_id}")
role = role.model_copy()
role.save_state(static_save=True)
async def expand(self, max_children):
@ -165,18 +166,6 @@ class Node:
node.save_new_role(new_role)
self.add_child(node)
# def evaluate_test(self):
# prediction_fpath = os.path.join(self.state["work_dir"], self.state["task"], "predictions.csv")
# predictions = pd.read_csv(prediction_fpath)["target"]
# # copy predictions.csv to the node_dir
# predictions_node_fpath = os.path.join(self.state["node_dir"], "Node-{self.id}-predictions.csv")
# predictions.to_csv(predictions_node_fpath, index=False)
# # load test_target.csv
# split_datasets_dir = self.state["datasets_dir"]
# gt = pd.read_csv(os.path.join(split_datasets_dir["test_target"]))["target"]
# metric = self.state["dataset_config"]["metric"]
# return evaluate_score(predictions, gt, metric)
def evaluate_prediction(self, split):
pred_path = os.path.join(self.state["work_dir"], self.state["task"], f"{split}_predictions.csv")
pred_node_path = os.path.join(self.state["node_dir"], f"Node-{self.id}-{split}_predictions.csv")
@ -191,6 +180,7 @@ class Node:
def evaluate_simulation(self, score_dict):
scores = {"dev_score": self.evaluate_prediction("dev"), "test_score": self.evaluate_prediction("test")}
scores["score"] = scores["dev_score"]
score_dict.update(scores)
return score_dict
@ -330,14 +320,10 @@ class MCTS:
self.children[root] = []
reward = await self.simulate(root, role)
self.backpropagate(root, reward)
children = await self.expand(root)
# 目前是随机选择1个后续可以改成多个
first_leaf = random.choice(children)
reward = await self.simulate(first_leaf)
self.backpropagate(first_leaf, reward)
node, reward = await self.expand_and_simulate(root)
# self.backpropagate(node, reward)
else:
root = self.root_node
# 后续迭代使用UCT进行选择expand并模拟和反向传播
for _ in range(rollouts): # number of rollouts
mcts_logger.log("MCTS", f"Start the next rollout {_+1}")
node = self.select(root)
@ -345,17 +331,23 @@ class MCTS:
if node.raw_value == 0:
reward = await self.simulate(node)
else:
reward = {"test_score": node.raw_value, "score": node.value}
reward = {"test_score": node.raw_value, "score": node.raw_reward["score"]}
mcts_logger.log("MCTS", f"Terminal node's reward: {reward}")
self.backpropagate(node, reward)
else:
if node.visited > 0:
children = await self.expand(node)
node = random.choice(children)
reward = await self.simulate(node)
self.backpropagate(node, reward)
node, reward = await self.expand_and_simulate(node)
# self.backpropagate(node, reward)
return self.best_path(root)
async def expand_and_simulate(self, node):
# Expand and randomly select a child node, then simulate it
if node.visited > 0:
children = await self.expand(node)
node = random.choice(children)
reward = await self.simulate(node)
self.backpropagate(node, reward)
return node, reward
def load_tree(self):
def load_children_node(node):
mcts_logger.log("MCTS", f"Load node {node.id}'s child: {node.children}")

View file

@ -86,6 +86,8 @@ CUSTOM_DATASETS = [
("07_icr-identify-age-related-conditions", "Class"),
]
DSAGENT_DATASETS = [("concrete-strength", "Strength"), ("smoker-status", "smoking"), ("software-defects", "defects")]
def get_split_dataset_path(dataset_name, config):
datasets_dir = config["datasets_dir"]
@ -109,20 +111,20 @@ def get_split_dataset_path(dataset_name, config):
def get_user_requirement(task_name, config):
datasets_dir = config["datasets_dir"]
# datasets_dir = config["datasets_dir"]
if task_name in config["datasets"]:
dataset = config["datasets"][task_name]
data_path = os.path.join(datasets_dir, dataset["dataset"])
# data_path = os.path.join(datasets_dir, dataset["dataset"])
user_requirement = dataset["user_requirement"]
return data_path, user_requirement
return user_requirement
else:
raise ValueError(
f"Dataset {task_name} not found in config file. Available datasets: {config['datasets'].keys()}"
)
def save_datasets_dict_to_yaml(datasets_dict):
with open("datasets.yaml", "w") as file:
def save_datasets_dict_to_yaml(datasets_dict, name="datasets.yaml"):
with open(name, "w") as file:
yaml.dump(datasets_dict, file)
@ -201,11 +203,15 @@ class ExpDataset:
def get_raw_dataset(self):
raw_dir = Path(self.dataset_dir, self.name, "raw")
train_df = None
test_df = None
if not os.path.exists(Path(raw_dir, "train.csv")):
raise FileNotFoundError(f"Raw dataset `train.csv` not found in {raw_dir}")
else:
df = pd.read_csv(Path(raw_dir, "train.csv"))
return df
train_df = pd.read_csv(Path(raw_dir, "train.csv"))
if os.path.exists(Path(raw_dir, "test.csv")):
test_df = pd.read_csv(Path(raw_dir, "test.csv"))
return train_df, test_df
def get_dataset_info(self):
raw_df = pd.read_csv(Path(self.dataset_dir, self.name, "raw", "train.csv"))
@ -249,10 +255,10 @@ class ExpDataset:
return req
def save_dataset(self, target_col):
df = self.get_raw_dataset()
df, test_df = self.get_raw_dataset()
if not self.check_dataset_exists() or self.force_update:
print(f"Saving Dataset {self.name} in {self.dataset_dir}")
self.split_and_save(df, target_col)
self.split_and_save(df, target_col, test_df=test_df)
else:
print(f"Dataset {self.name} already exists")
if not self.check_datasetinfo_exists() or self.force_update:
@ -278,10 +284,14 @@ class ExpDataset:
df_target = df_target.drop(columns=[target_col])
df_target.to_csv(Path(path, f"split_{split}_target.csv"), index=False)
def split_and_save(self, df, target_col):
def split_and_save(self, df, target_col, test_df=None):
if not target_col:
raise ValueError("Target column not provided")
train, test = train_test_split(df, test_size=1 - TRAIN_TEST_SPLIT, random_state=SEED)
if test_df is None:
train, test = train_test_split(df, test_size=1 - TRAIN_TEST_SPLIT, random_state=SEED)
else:
train = df
test = test_df
train, dev = train_test_split(train, test_size=1 - TRAIN_DEV_SPLIT, random_state=SEED)
self.save_split_datasets(train, "train")
self.save_split_datasets(dev, "dev", target_col)
@ -304,7 +314,7 @@ class OpenMLExpDataset(ExpDataset):
raw_dir = Path(self.dataset_dir, self.name, "raw")
os.makedirs(raw_dir, exist_ok=True)
dataset_df.to_csv(Path(raw_dir, "train.csv"), index=False)
return dataset_df
return dataset_df, None
def get_dataset_info(self):
dataset_info = super().get_dataset_info()
@ -315,14 +325,9 @@ class OpenMLExpDataset(ExpDataset):
return dataset_info
# class HFExpDataset(ExpDataset):
# def __init__(self, name, dataset_dir, dataset_name, **kwargs):
# super().__init__(name, dataset_dir, **kwargs)
async def process_dataset(dataset, solution_designer, save_analysis_pool, datasets_dict):
async def process_dataset(dataset, solution_designer: SolutionDesigner, save_analysis_pool, datasets_dict):
if save_analysis_pool:
asyncio.run(solution_designer.generate_solutions(dataset.get_dataset_info(), dataset.name))
await solution_designer.generate_solutions(dataset.get_dataset_info(), dataset.name)
dataset_dict = create_dataset_dict(dataset)
datasets_dict["datasets"][dataset.name] = dataset_dict
@ -330,7 +335,7 @@ async def process_dataset(dataset, solution_designer, save_analysis_pool, datase
if __name__ == "__main__":
datasets_dir = "D:/work/automl/datasets"
force_update = False
save_analysis_pool = False
save_analysis_pool = True
datasets_dict = {"datasets": {}}
solution_designer = SolutionDesigner()
for dataset_id in OPENML_DATASET_IDS:
@ -341,4 +346,8 @@ if __name__ == "__main__":
custom_dataset = ExpDataset(dataset_name, datasets_dir, target_col=target_col, force_update=force_update)
asyncio.run(process_dataset(custom_dataset, solution_designer, save_analysis_pool, datasets_dict))
for dataset_name, target_col in DSAGENT_DATASETS:
custom_dataset = ExpDataset(dataset_name, datasets_dir, target_col=target_col, force_update=force_update)
asyncio.run(process_dataset(custom_dataset, solution_designer, save_analysis_pool, datasets_dict))
save_datasets_dict_to_yaml(datasets_dict)

64
expo/data/hf_data.py Normal file
View file

@ -0,0 +1,64 @@
import asyncio
import os
from pathlib import Path
import pandas as pd
from datasets import load_dataset
from expo.data.dataset import ExpDataset, process_dataset, save_datasets_dict_to_yaml
from expo.insights.solution_designer import SolutionDesigner
HFDATSETS = [
{"name": "sms_spam", "dataset_name": "ucirvine/sms_spam", "target_col": "label"},
{"name": "banking77", "dataset_name": "PolyAI/banking77", "target_col": "label"},
{"name": "gnad10", "dataset_name": "community-datasets/gnad10", "target_col": "label"},
{"name": "oxford-iiit-pet", "dataset_name": "timm/oxford-iiit-pet", "target_col": "label"},
{"name": "stanford_cars", "dataset_name": "tanganke/stanford_cars", "target_col": "label"},
{"name": "fashion_mnist", "dataset_name": "zalando-datasets/fashion_mnist", "target_col": "label"},
]
class HFExpDataset(ExpDataset):
train_ratio = 0.6
dev_ratio = 0.2
test_ratio = 0.2
def __init__(self, name, dataset_dir, dataset_name, **kwargs):
self.name = name
self.dataset_dir = dataset_dir
self.dataset_name = dataset_name
self.target_col = kwargs.get("target_col", "label")
self.dataset = load_dataset(dataset_name)
super().__init__(self.name, dataset_dir, **kwargs)
def get_raw_dataset(self):
raw_dir = Path(self.dataset_dir, self.name, "raw")
raw_dir.mkdir(parents=True, exist_ok=True)
if os.path.exists(Path(raw_dir, "train.csv")):
df = pd.read_csv(Path(raw_dir, "train.csv"))
else:
df = self.dataset["train"].to_pandas()
df.to_csv(Path(raw_dir, "train.csv"))
if os.path.exists(Path(raw_dir, "test.csv")):
test_df = pd.read_csv(Path(raw_dir, "test.csv"))
else:
if "test" in self.dataset:
test_df = self.dataset["test"].to_pandas()
test_df.to_csv(Path(raw_dir, "test.csv"))
else:
test_df = None
return df, test_df
if __name__ == "__main__":
dataset_dir = "D:/work/automl/datasets"
save_analysis_pool = True
datasets_dict = {"datasets": {}}
solution_designer = SolutionDesigner()
for dataset_meta in HFDATSETS:
hf_dataset = HFExpDataset(
dataset_meta["name"], dataset_dir, dataset_meta["dataset_name"], target_col=dataset_meta["target_col"]
)
asyncio.run(process_dataset(hf_dataset, solution_designer, save_analysis_pool, datasets_dict))
save_datasets_dict_to_yaml(datasets_dict, "hf_datasets.yaml")

View file

@ -151,3 +151,28 @@ datasets:
\ the target column `Class`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 weighted on the\
\ eval data. Do not plot or make any visualizations.\n"
concrete-strength:
dataset: concrete-strength
metric: rmse
target_col: Strength
user_requirement: "This is a concrete-strength dataset. Your goal is to predict\
\ the target column `Strength`.\nPerform data analysis, data preprocessing,\
\ feature engineering, and modeling to predict the target. \nReport rmse on\
\ the eval data. Do not plot or make any visualizations.\n"
smoker-status:
dataset: smoker-status
metric: f1
target_col: smoking
user_requirement: "This is a smoker-status dataset. Your goal is to predict the\
\ target column `smoking`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 on the eval data.\
\ Do not plot or make any visualizations.\n"
software-defects:
dataset: software-defects
metric: f1
target_col: defects
user_requirement: "This is a software-defects dataset. Your goal is to predict\
\ the target column `defects`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 on the eval data.\
\ Do not plot or make any visualizations.\n"

View file

@ -34,7 +34,7 @@ class AugExperimenter(Experimenter):
di.role_dir = f"{di.role_dir}_{self.args.task}"
requirement = user_requirement + EXPS_PROMPT.format(experience=exps[i])
print(requirement)
score_dict = await self.run_di(di, requirement)
score_dict = await self.run_di(di, requirement, run_idx=i)
results.append(
{
"idx": i,

View file

@ -7,7 +7,7 @@ import pandas as pd
from expo.evaluation.evaluation import evaluate_score
from expo.MCTS import create_initial_state
from expo.research_assistant import ResearchAssistant
from expo.utils import DATA_CONFIG
from expo.utils import DATA_CONFIG, save_notebook
class Experimenter:
@ -16,7 +16,8 @@ class Experimenter:
def __init__(self, args, **kwargs):
self.args = args
self.start_time = datetime.datetime.now().strftime("%Y%m%d%H%M")
self.start_time_raw = datetime.datetime.now()
self.start_time = self.start_time_raw.strftime("%Y%m%d%H%M")
self.state = create_initial_state(
self.args.task,
start_task_id=1,
@ -25,7 +26,7 @@ class Experimenter:
name="",
)
async def run_di(self, di, user_requirement):
async def run_di(self, di, user_requirement, run_idx):
max_retries = 3
num_runs = 1
run_finished = False
@ -38,6 +39,7 @@ class Experimenter:
except Exception as e:
print(f"Error: {e}")
num_runs += 1
save_notebook(role=di, save_dir=self.result_path, name=f"{self.args.task}_{self.start_time}_{run_idx}")
if not run_finished:
score_dict = {"train_score": -1, "dev_score": -1, "test_score": -1, "score": -1}
return score_dict
@ -49,7 +51,7 @@ class Experimenter:
for i in range(self.args.num_experiments):
di = ResearchAssistant(node_id="0", use_reflection=self.args.reflection)
score_dict = await self.run_di(di, user_requirement)
score_dict = await self.run_di(di, user_requirement, run_idx=i)
results.append(
{"idx": i, "score_dict": score_dict, "user_requirement": user_requirement, "args": vars(self.args)}
)
@ -70,10 +72,10 @@ class Experimenter:
0,
{
"best_dev_score": best_dev_score,
"best_score_idx": best_score_idx,
"best_test_score": test_scores[best_score_idx],
"best_dev_score_idx": best_score_idx,
"best_dev_test_score": test_scores[best_score_idx],
"avg_test_score": avg_score,
"best_score": global_best_score,
"global_best_test_score": global_best_score,
},
)
self.save_result(results)
@ -99,11 +101,12 @@ class Experimenter:
return score_dict
def save_result(self, result):
end_time = datetime.datetime.now().strftime("%Y%m%d%H%M")
end_time_raw = datetime.datetime.now()
end_time = end_time_raw.strftime("%Y%m%d%H%M")
time_info = {
"start_time": self.start_time,
"end_time": end_time,
"duration (minutes)": float(end_time) - float(self.start_time),
"duration (seconds)": (end_time_raw - self.start_time_raw).seconds,
}
result = result.copy()
result.insert(0, time_info)

View file

@ -1,13 +1,21 @@
from expo.evaluation.visualize_mcts import get_tree_text
from expo.experimenter.experimenter import Experimenter
from expo.Greedy import Greedy
from expo.MCTS import MCTS
class MCTSExperimenter(Experimenter):
result_path: str = "results/mcts"
def __init__(self, args, greedy=False, **kwargs):
super().__init__(args, **kwargs)
self.greedy = greedy
async def run_experiment(self):
mcts = MCTS(root_node=None, max_depth=5)
if self.greedy:
mcts = Greedy(root_node=None, max_depth=5)
else:
mcts = MCTS(root_node=None, max_depth=5)
best_nodes = await mcts.search(
self.args.task,
self.data_config,
@ -22,8 +30,8 @@ class MCTSExperimenter(Experimenter):
text, num_generated_codes = get_tree_text(mcts.root_node)
text += f"Generated {num_generated_codes} unique codes.\n"
text += f"Best node: {best_node}, score: {best_node.raw_reward}\n"
text += f"Dev best node: {dev_best_node}, score: {dev_best_node.raw_reward}\n"
text += f"Best node: {best_node.id}, score: {best_node.raw_reward}\n"
text += f"Dev best node: {dev_best_node.id}, score: {dev_best_node.raw_reward}\n"
print(text)
self.save_tree(text)

View file

@ -84,9 +84,13 @@ class InstructionGenerator:
new_instructions = []
if len(data) == 0:
mcts_logger.log("MCTS", f"No insights available for task {task_id}")
return [original_instruction] # Return the original instruction if no insights are available
for item in data[:max_num]:
insights = item["Analysis"]
# return [original_instruction] # Return the original instruction if no insights are available
for i in range(max_num):
if len(data) == 0:
insights = "No insights available"
else:
item = data[i]
insights = item["Analysis"]
new_instruction = await InstructionGenerator.generate_new_instruction(original_instruction, insights)
new_instructions.append(new_instruction)
return new_instructions

View file

@ -10,7 +10,7 @@ from expo.experimenter.mcts import MCTSExperimenter
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("--name", type=str, default="")
parser.add_argument("--exp_mode", type=str, default="mcts", choices=["mcts", "aug", "base", "custom"])
parser.add_argument("--exp_mode", type=str, default="mcts", choices=["mcts", "aug", "base", "custom", "greedy"])
get_di_args(parser)
get_mcts_args(parser)
get_aug_exp_args(parser)
@ -41,6 +41,8 @@ def get_di_args(parser):
async def main(args):
if args.exp_mode == "mcts":
experimenter = MCTSExperimenter(args)
elif args.exp_mode == "greedy":
experimenter = MCTSExperimenter(args, greedy=True)
elif args.exp_mode == "aug":
experimenter = AugExperimenter(args)
elif args.exp_mode == "base":

View file

@ -7,8 +7,7 @@ from pathlib import Path
import nbformat
import yaml
from loguru import logger as _logger
# from nbclient import NotebookClient
from nbclient import NotebookClient
from nbformat.notebooknode import NotebookNode
from metagpt.roles.role import Role
@ -20,7 +19,9 @@ def load_data_config(file_path="data.yaml"):
return data_config
DATASET_CONFIG = load_data_config("datasets.yaml")
DATA_CONFIG = load_data_config()
DATA_CONFIG["datasets"].update(DATASET_CONFIG["datasets"])
def get_mcts_logger():
@ -92,15 +93,24 @@ def process_cells(nb: NotebookNode) -> NotebookNode:
def save_notebook(role: Role, save_dir: str = "", name: str = ""):
save_dir = Path(save_dir)
tasks = role.planner.plan.tasks
codes = [task.code for task in tasks if task.code]
clean_nb = nbformat.v4.new_notebook()
for code in codes:
clean_nb.cells.append(nbformat.v4.new_code_cell(code))
nb = process_cells(role.execute_code.nb)
file_path = save_dir / f"{name}.ipynb"
clean_file_path = save_dir / f"{name}_clean.ipynb"
nbformat.write(nb, file_path)
nbformat.write(clean_nb, clean_file_path)
async def load_execute_notebook(role):
tasks = role.planner.plan.tasks
codes = [task.code for task in tasks if task.code]
executor = role.execute_code
executor.nb = nbformat.v4.new_notebook()
executor.nb.client = NotebookClient(executor.nb)
# await executor.build()
for code in codes:
outputs, success = await executor.run(code)

View file

@ -6,8 +6,6 @@
"""
from __future__ import annotations
import json
from metagpt.actions import Action
from metagpt.prompts.di.write_analysis_code import (
CHECK_DATA_PROMPT,
@ -30,9 +28,10 @@ class WriteAnalysisCode(Action):
)
rsp = await self._aask(reflection_prompt, system_msgs=[REFLECTION_SYSTEM_MSG])
reflection = json.loads(CodeParser.parse_code(block=None, text=rsp))
return reflection["improved_impl"]
# reflection = json.loads(CodeParser.parse_code(block=None, text=rsp))
# return reflection["improved_impl"]
reflection = CodeParser.parse_code(block=None, text=rsp)
return reflection
async def run(
self,

View file

@ -40,15 +40,17 @@ Tests failed:
assert add(1, 2) == 3 # output: -1
assert add(1, 3) == 4 # output: -2
[reflection on previous impl]:
[reflection on previous impl]
The implementation failed the test cases where the input integers are 1 and 2. The issue arises because the code does not add the two integers together, but instead subtracts the second integer from the first. To fix this issue, we should change the operator from `-` to `+` in the return statement. This will ensure that the function returns the correct output for the given input.
[improved impl]:
[improved impl]
```python
def add(a: int, b: int) -> int:
"""
Given integers a and b, return the total value of a and b.
"""
return a + b
```
'''
REFLECTION_PROMPT = """
@ -60,17 +62,17 @@ Here is an example of debugging with reflection.
[context]
{context}
[previous impl]:
[previous impl]
{previous_impl}
[instruction]
Analyze your previous code and error in [context] step by step, provide me with improved method and code. Remember to follow [context] requirement. Don't forget to write code for steps behind the error step.
Output a json following the format:
```json
{{
"reflection": str = "Reflection on previous implementation",
"improved_impl": str = "Refined code after reflection (do not include nested code block here).",
}}
Output in the following format:
[reflection on previous impl]
...
[improved impl]:
```python
# your code
```
"""