Merge branch 'expo' into 'sela'

open-source sela

See merge request agents/exp_optimizer!28
This commit is contained in:
林义章 2024-10-18 08:27:49 +00:00
commit f0704553e0
37 changed files with 3341 additions and 21 deletions

2
.gitignore vendored
View file

@ -29,6 +29,7 @@ share/python-wheels/
MANIFEST
metagpt/tools/schemas/
examples/data/search_kb/*.json
expo/AutogluonModels
# PyInstaller
# Usually these files are written by a python scripts from a template
@ -188,3 +189,4 @@ cov.xml
*-structure.json
*.dot
.python-version
expo/results/*

19
expo/Greedy.py Normal file
View file

@ -0,0 +1,19 @@
import random
from expo.MCTS import MCTS
class Greedy(MCTS):
def best_child(self):
if len(self.children) == 0:
return self.root_node
all_children = [child for children in self.children.values() for child in children]
return max(all_children, key=lambda x: x.normalized_reward.get("dev_score", 0))
class Random(MCTS):
def best_child(self):
if len(self.children) == 0:
return self.root_node
all_children = [child for children in self.children.values() for child in children]
return random.choice(all_children)

457
expo/MCTS.py Normal file
View file

@ -0,0 +1,457 @@
import json
import math
import os
import pickle
import random
import shutil
import numpy as np
import pandas as pd
from expo.data.custom_task import get_mle_bench_requirements, get_mle_task_id
from expo.data.dataset import generate_task_requirement, get_split_dataset_path
from expo.evaluation.evaluation import evaluate_score
from expo.insights.instruction_generator import InstructionGenerator
from expo.research_assistant import ResearchAssistant, TimeoutException
from expo.utils import get_exp_pool_path, load_execute_notebook, mcts_logger
from metagpt.tools.tool_recommend import ToolRecommender
from metagpt.utils.common import read_json_file
def initialize_di_root_node(state, reflection: bool = True):
role = ResearchAssistant(
node_id="0",
start_task_id=state["start_task_id"],
use_reflection=reflection,
role_dir=state["node_dir"],
role_timeout=state["role_timeout"],
)
return role, Node(parent=None, state=state, action=None, value=0)
def create_initial_state(task, start_task_id, data_config, args):
external_eval = args.external_eval
if args.custom_dataset_dir:
dataset_config = None
datasets_dir = args.custom_dataset_dir
requirement = get_mle_bench_requirements(
args.custom_dataset_dir, data_config, special_instruction=args.special_instruction
)
exp_pool_path = None
# external_eval = False # make sure external eval is false if custom dataset is used
task = get_mle_task_id(args.custom_dataset_dir)
else:
dataset_config = data_config["datasets"][task]
if dataset_config["metric"] == "rmse":
args.low_is_better = True
datasets_dir = get_split_dataset_path(task, data_config)
requirement = generate_task_requirement(
task, data_config, is_di=True, special_instruction=args.special_instruction
)
exp_pool_path = get_exp_pool_path(task, data_config, pool_name="ds_analysis_pool")
initial_state = {
"task": task,
"work_dir": data_config["work_dir"],
"node_dir": os.path.join(data_config["work_dir"], data_config["role_dir"], f"{task}{args.name}"),
"dataset_config": dataset_config,
"datasets_dir": datasets_dir, # won't be used if external eval is used
"exp_pool_path": exp_pool_path,
"requirement": requirement,
"has_run": False,
"start_task_id": start_task_id,
"low_is_better": args.low_is_better,
"role_timeout": args.role_timeout,
"external_eval": external_eval,
"custom_dataset_dir": args.custom_dataset_dir,
}
os.makedirs(initial_state["node_dir"], exist_ok=True)
return initial_state
class Node:
state: dict = {}
action: str = None
value: float = 0
visited: int = 0
children: list = []
normalized_reward: dict = {"train_score": 0, "dev_score": 0, "test_score": 0}
parent = None
def __init__(self, parent=None, state=None, action=None, value=0, max_depth=4, **kwargs):
self.state = state
self.action = action
self.value = value
self.raw_value = 0
self.raw_reward = dict()
self.parent = parent
self.children = []
self.max_depth = max_depth
self.depth = self.generate_depth()
self.id = self.generate_id()
if self.parent is not None:
self.save_node()
def avg_value(self):
if self.visited == 0:
return 0
return self.value / self.visited
def __hash__(self):
return hash(self.id)
def save_node(self):
os.makedirs(self.state["node_dir"], exist_ok=True)
with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), "wb") as f:
pickle.dump(self, f)
def load_node(self):
with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), "rb") as f:
return pickle.load(f)
def get_depth(self):
return self.depth
def get_node_dir(self):
return self.state["node_dir"]
def generate_depth(self):
if self.parent is None:
return 0
else:
return self.parent.depth + 1
def generate_id(self):
if self.parent is None:
return "0"
else:
num_sibling = len(self.parent.children)
return f"{self.parent.id}-{num_sibling}"
def is_terminal(self):
return int(self.state["start_task_id"]) == self.max_depth + 1 # TODO: Check if this is correct or +1
def is_fully_expanded(self):
return len(self.children) > 0
def add_child(self, child_node):
self.children.append(child_node)
def update(self, reward: dict, child_node=None):
if child_node is not None:
child_role = child_node.load_role()
role = self.load_role()
role.update_til_start_task(child_role)
role.save_state()
else:
self.raw_value = reward["test_score"]
self.value += reward["score"]
self.visited += 1
self.save_node()
def get_role_path(self):
fname = f"Node-{self.id}.json"
role_path = os.path.join(self.state["node_dir"], fname)
return role_path
def load_role(self):
role_dict = read_json_file(self.get_role_path())
if role_dict.get("tool_recommender") is None:
role_dict["tool_recommender"] = ToolRecommender()
elif isinstance(role_dict.get("tool_recommender", {}).get("tools"), dict):
role_dict["tool_recommender"]["tools"] = list(role_dict["tool_recommender"]["tools"].keys())
role = ResearchAssistant(**role_dict)
if self.parent is not None: # TODO: Check this
parent_role = self.parent.load_role()
role.update_til_start_task(parent_role, backward=False)
role.remap_tasks()
return role
def save_new_role(self, role: ResearchAssistant):
role.node_id = self.id
role.start_task_id = self.state["start_task_id"]
role.state_saved = False
role.change_next_instruction(self.action)
mcts_logger.log("MCTS", f"Saving new role: {role.node_id}")
role = role.model_copy()
role.save_state(static_save=True)
async def expand(self, max_children: int, instruction_generator: InstructionGenerator):
if self.is_fully_expanded():
return
role = self.load_role()
original_instruction = role.get_next_instruction()
insights = await instruction_generator.generate_new_instructions(
task_id=role.start_task_id + 1,
original_instruction=original_instruction,
max_num=max_children,
)
new_state = self.state.copy()
new_state["start_task_id"] += 1
for insight in insights:
new_role = role.model_copy()
node = Node(parent=self, state=new_state, action=insight, value=0)
node.save_new_role(new_role)
self.add_child(node)
def get_predictions_path(self, split):
return os.path.join(self.state["node_dir"], f"Node-{self.id}-{split}_predictions.csv")
def get_and_move_predictions(self, split):
if not os.path.exists(self.get_predictions_path(split)):
pred_path = os.path.join(self.state["work_dir"], self.state["task"], f"{split}_predictions.csv")
shutil.copy(pred_path, self.get_predictions_path(split))
os.remove(pred_path)
return pd.read_csv(self.get_predictions_path(split))
def get_gt(self, split):
gt_path = os.path.join(self.state["datasets_dir"][f"{split}_target"])
return pd.read_csv(gt_path)
def evaluate_prediction(self, split):
preds = self.get_and_move_predictions(split)["target"]
gt = self.get_gt(split)["target"]
metric = self.state["dataset_config"]["metric"]
return evaluate_score(preds, gt, metric)
def evaluate_simulation(self, score_dict):
if self.state["external_eval"]: # use external evaluation
scores = {"dev_score": self.evaluate_prediction("dev"), "test_score": self.evaluate_prediction("test")}
scores["score"] = scores["dev_score"]
score_dict.update(scores)
else:
self.get_and_move_predictions("dev")
self.get_and_move_predictions("test")
return score_dict
async def run_node(self, role=None):
if self.is_terminal() and role is not None:
if role.state_saved:
return self.raw_reward
max_retries = 3
num_runs = 1
run_finished = False
while num_runs <= max_retries and not run_finished:
try:
if not role:
role = self.load_role()
await load_execute_notebook(role) # execute previous notebook's code
await role.run(with_message="continue")
else:
await role.run(with_message=self.state["requirement"])
score_dict = await role.get_score()
score_dict = self.evaluate_simulation(score_dict)
self.raw_reward = score_dict
run_finished = True
except TimeoutException as e:
mcts_logger.log("MCTS", f"Role-level timeout: {e}")
break
except Exception as e:
mcts_logger.log("MCTS", f"Error in running the role: {e}")
num_runs += 1
if not run_finished:
mcts_logger.log("MCTS", f"Role {role.node_id} failed to run")
if self.state["low_is_better"]:
score_dict = {"test_score": np.inf, "dev_score": np.inf, "score": np.inf}
else:
score_dict = {"test_score": 0, "dev_score": 0, "score": 0}
self.raw_reward = score_dict
if self.state["low_is_better"]:
# normalized the score to be between 0 and 1, and higher is better
def normalize_score(score):
if score == -1:
return 0
return 1 / (1 + score)
score_dict = {k: normalize_score(v) for k, v in score_dict.items()}
self.normalized_reward = score_dict
result_dict = role.get_solution()
return score_dict, result_dict
class MCTS:
# data_path
root_node: Node = None
children: dict = {}
max_depth: int = None
c_explore: float = 1.4
c_unvisited: float = 0.8
node_order: list = []
# insight generator
instruction_generator: InstructionGenerator = None
def __init__(self, root_node, max_depth, use_fixed_insights):
self.root_node = root_node
self.max_depth = max_depth
self.use_fixed_insights = use_fixed_insights
def select(self, node: Node):
node = self.best_child()
mcts_logger.log("MCTS", f"Selected node id: {node.id}")
return node
def best_child(self):
def uct(node: Node):
n_visits = node.visited if node.visited else self.c_unvisited
avg_value = node.avg_value() if node.visited else node.value / self.c_unvisited
return avg_value + self.c_explore * math.sqrt(math.log(node.parent.visited) / n_visits)
if len(self.children) == 0:
return self.root_node
all_children = [child for children in self.children.values() for child in children]
return max(all_children, key=uct)
async def expand(self, node: Node, max_children=5):
await node.expand(max_children, self.instruction_generator)
if node not in self.children or not self.children[node]:
self.children[node] = node.children
return node.children
async def simulate(self, node: Node, role=None):
"Returns the reward for a random simulation (to completion) of `node`"
mcts_logger.log("MCTS", f"Start simulating node {node.id}:")
while node.children:
node = random.choice(node.children)
reward, result_dict = await node.run_node(role)
mcts_logger.log("MCTS", f"Simulated node's reward: {reward}")
# TODO: add new insights
return reward
def backpropagate(self, node: Node, reward):
child_node = node
node.update(reward)
node = node.parent
while node is not None:
node.update(reward, child_node)
node, child_node = node.parent, node
def best_path(self, root: Node):
best_child = root
global_best_score = root.normalized_reward["test_score"]
dev_best_score = root.normalized_reward["dev_score"]
def bfs(node: Node, best_score, best_child: Node, split):
assert split in ["test_score", "dev_score"]
if node not in self.children:
return best_score, best_child
for child in self.children[node]:
score = child.normalized_reward[split]
print(child.id, split, score)
if score > best_score:
best_score = score
best_child = child
best_score, best_child = bfs(child, best_score, best_child, split)
return best_score, best_child
_, global_best_child = bfs(root, global_best_score, best_child, "test_score")
_, dev_best_child = bfs(root, dev_best_score, best_child, "dev_score")
return {"dev_best": dev_best_child, "global_best": global_best_child, "scores": self.get_score_order_dict()}
def get_num_simulations(self):
return self.root_node.visited
def save_node_order(self, node_id):
self.node_order.append(node_id)
with open(os.path.join(self.root_node.state["node_dir"], "node_order.json"), "w") as f:
json.dump(self.node_order, f)
def load_node_order(self):
with open(os.path.join(self.root_node.state["node_dir"], "node_order.json"), "r") as f:
self.node_order = json.load(f)
def get_score_order_dict(self):
scores = {"dev": [], "test": [], "dev_raw": [], "test_raw": []}
for node_id in self.node_order:
node = Node(parent=None, state=self.root_node.state, action=None, value=0)
node.id = node_id
node = node.load_node()
scores["dev"].append(node.normalized_reward["dev_score"])
scores["test"].append(node.normalized_reward["test_score"])
scores["dev_raw"].append(node.raw_reward["dev_score"])
scores["test_raw"].append(node.raw_reward["test_score"])
return scores
async def search(self, state, args):
reflection = args.reflection
load_tree = args.load_tree
rollouts = args.rollouts
from_scratch = args.from_scratch
role, root = initialize_di_root_node(state, reflection=reflection)
self.root_node = root
self.instruction_generator = InstructionGenerator(
state=state, use_fixed_insights=self.use_fixed_insights, from_scratch=from_scratch
)
await self.instruction_generator.initialize()
tree_loaded = False
if load_tree:
tree_loaded = self.load_tree()
mcts_logger.log("MCTS", f"Number of simulations: {self.get_num_simulations()}")
mcts_logger.log("MCTS", f"Tree loaded: {tree_loaded}")
if not tree_loaded:
rollouts -= 2 # 2 rollouts for the initial tree
if rollouts < 0:
raise ValueError("Rollouts must be greater than 2 if there is no tree to load")
self.children[root] = []
reward = await self.simulate(root, role)
self.backpropagate(root, reward)
node, reward = await self.expand_and_simulate(root)
# self.backpropagate(node, reward)
self.save_node_order(root.id)
self.save_node_order(node.id)
else:
root = self.root_node
self.load_node_order()
for _ in range(rollouts): # number of rollouts
mcts_logger.log("MCTS", f"Start the next rollout {_+1}")
node = self.select(root)
if node.is_terminal():
if node.raw_value == 0:
reward = await self.simulate(node)
else:
reward = {"test_score": node.raw_value, "score": node.raw_reward["score"]}
mcts_logger.log("MCTS", f"Terminal node's reward: {reward}")
self.backpropagate(node, reward)
else:
node, reward = await self.expand_and_simulate(node)
# self.backpropagate(node, reward)
self.save_node_order(node.id)
return self.best_path(root)
async def expand_and_simulate(self, node):
# Expand and randomly select a child node, then simulate it
if node.visited > 0:
children = await self.expand(node)
node = random.choice(children)
reward = await self.simulate(node)
self.backpropagate(node, reward)
return node, reward
def load_tree(self):
def load_children_node(node):
mcts_logger.log("MCTS", f"Load node {node.id}'s child: {node.children}")
if node.is_terminal() or not node.children:
return
for child in node.children:
child.load_node()
self.children[child] = child.children
load_children_node(child)
# Load all pkl files in the node_dir
all_pkl_files = os.listdir(self.root_node.state["node_dir"])
all_pkl_files = [f for f in all_pkl_files if f.endswith(".pkl")]
if os.path.exists(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl")):
with open(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl"), "rb") as f:
self.root_node = pickle.load(f)
self.children[self.root_node] = self.root_node.children
load_children_node(self.root_node)
if self.children:
return True
return False

288
expo/README.md Normal file
View file

@ -0,0 +1,288 @@
# SELA: Tree-Search Enhanced LLM Agents for Automated Machine Learning
## 1. Data Preparation
- Download Datasetshttps://deepwisdom.feishu.cn/drive/folder/RVyofv9cvlvtxKdddt2cyn3BnTc?from=from_copylink
- Download and prepare datasets from scratch:
```
cd expo/data
python dataset.py --save_analysis_pool
python hf_data.py --save_analysis_pool
```
## 2. Configs
### Data Config
`datasets.yaml` Provide base prompts, metrics, target columns for respective datasets
- Modify `datasets_dir` to the root directory of all the datasets in `data.yaml`
### LLM Config
```
llm:
api_type: 'openai'
model: deepseek-coder
base_url: "https://oneapi.deepwisdom.ai/v1"
api_key: sk-xxx
temperature: 0.5
```
### Budget
Experiment rollouts k = 5, 10, 20
### Prompt Usage
- Use the function `generate_task_requirement` in `dataset.py` to get task requirement.
- If the method is non-DI-based, set `is_di=False`.
- Use `utils.DATA_CONFIG` as `data_config`
## 3. SELA
### Run SELA
#### Setup
In the root directory,
```
pip install -e .
cd expo
pip install -r requirements.txt
```
#### Run
- `python run_experiment.py --exp_mode mcts --task titanic --rollouts 10`
If the dataset has reg metric, remember to use `--low_is_better`:
- `python run_experiment.py --exp_mode mcts --task house-prices --rollouts 10 --low_is_better`
In addition to the generated insights, include the fixed insights saved in `expo/insights/fixed_insights.json`
- `--use_fixed_insights`
#### Ablation Study
**DI RandomSearch**
- Single insight
`python run_experiment.py --exp_mode aug --task titanic --aug_mode single`
- Set insight
`python run_experiment.py --exp_mode aug --task titanic --aug_mode set`
## 4. Evaluation
Each baseline needs to produce `dev_predictions.csv``test_predictions.csv`. Each csv file only needs a `target` column.
- Use the function `evaluate_score` to evaluate.
#### MLE-Bench
**Note: mle-bench requires python 3.11 or higher**
```
git clone https://github.com/openai/mle-bench.git
cd mle-bench
pip install -e .
```
```
mlebench prepare -c <competition-id> --data-dir <dataset-dir-save-path>
```
Enter the following command to run the experiment:
```
python run_experiment.py --exp_mode mcts --custom_dataset_dir <dataset-dir-save-path/prepared/public> --rollouts 10 --from_scratch --role_timeout 3600
```
## 5. Baselines
### DS Agent
```
git clone https://github.com/guosyjlu/DS-Agent.git
```
Modify the following lines in deployment/generate.py (lines 46-48) as shown below (the purpose is to use deepseek instead of OpenAI's API):
```python
messages = [{"role": "user", "content": prompt}]
if 'gpt' in llm:
response = openai.ChatCompletion.create(**{"messages": messages,**raw_request})
raw_completion = response["choices"][0]["message"]["content"]
elif llm == 'deepseek-coder':
from openai import OpenAI
client = OpenAI(
api_key="yours",
base_url="https://oneapi.deepwisdom.ai/v1"
)
response = client.chat.completions.create(
model="deepseek-coder",
messages=[
# {"role": "system", "content": "You are a helpful assistant"},
{"role": "user", "content": prompt},
],
temperature=temperature,
stream=False
)
raw_completion = response.choices[0].message.content
completion = raw_completion.split("```python")[1].split("```")[0]
```
After making the changes, create a new `deployment/test.sh` and run the following two lines separately, where `$TASK` is the name of the task you want to test
```
python -u generate.py --llm deepseek-coder --task $TASK --shot 1 --retrieval > "$TASK".txt 2>&1
python -u evaluation.py --path "deepseek-coder_True_1" --task $TASK --device 0 > "$TASK"_eval.txt 2>&1
```
### AIDE
#### Setup
```
git clone https://github.com/WecoAI/aideml.git
```
Modify `aideml/aide/utils/config.yaml` - change `k_fold_validation`, `code model`, and `feedback model` as follows:
```yaml
# agent hyperparams
agent:
# how many improvement iterations to run
steps: 10
# whether to instruct the agent to use CV (set to 1 to disable)
k_fold_validation: 1
# LLM settings for coding
code:
model: deepseek-coder
temp: 0.5
# LLM settings for evaluating program output / tracebacks
feedback:
model: deepseek-coder
temp: 0.5
# hyperparameters for the tree search
search:
max_debug_depth: 3
debug_prob: 0.5
num_drafts: 5
```
Since Deepseek is compatible to OpenAI's API, change `base_url` into `your own url``api_key` into `your api key`
```
export OPENAI_API_KEY="your api key"
export OPENAI_BASE_URL="your own url"
```
Modify `aideml/aide/backend/__init__.py`'s line 30 and below:
```python
model_kwargs = model_kwargs | {
"model": model,
"temperature": temperature,
"max_tokens": max_tokens,
}
if "claude-" in model:
query_func = backend_anthropic.query
else:
query_func = backend_openai.query
```
Since deepseekV2.5 no longer supports system message using function call, modify `aideml/aide/agent.py`'s line 312:
```python
response = cast(
dict,
query(
system_message=None,
user_message=prompt,
func_spec=review_func_spec,
model=self.acfg.feedback.model,
temperature=self.acfg.feedback.temp,
),
)
```
Modify and install:
```
cd aideml
pip install -e .
```
#### Run
Run the following script to get the running results, a `log` folder and a `workspace` folder will be generated in the current directory
The `log` folder will contain the experimental configuration and the generated scheme, and the `workspace` folder will save the final results generated by aide
```
python experimenter/aide.py
```
### Autogluon
#### Setup
```
pip install -U pip
pip install -U setuptools wheel
pip install autogluon
```
For Tabular data:
```
python run_expriment.py --exp_mode autogluon --task {task_name}
```
For Multimodal data:
```
python run_expriment.py --exp_mode autogluon --task {task_name} --is_multimodal
```
Replace {task_name} with the specific task you want to run.
### AutoSklearn
#### System requirements
auto-sklearn has the following system requirements:
- Linux operating system (for example Ubuntu)
- Python (>=3.7)
- C++ compiler (with C++11 supports)
In case you try to install Auto-sklearn on a system where no wheel files for the pyrfr package are provided (see here for available wheels) you also need:
- SWIG [(get SWIG here).](https://www.swig.org/survey.html)
For an explanation of missing Microsoft Windows and macOS support please check the Section [Windows/macOS compatibility](https://automl.github.io/auto-sklearn/master/installation.html#windows-macos-compatibility).
#### Setup
```
pip install auto-sklearn
```
#### Run
```
python run_experiment.py --exp_mode autosklearn --task titanic
```
### Base DI
For setup, check 4.
- `python run_experiment.py --exp_mode base --task titanic --num_experiments 10`
- Specifically instruct DI to use AutoGluon: `--special_instruction ag`
- Specifically instruct DI to use the stacking ensemble method: `--special_instruction stacking`

3
expo/data.yaml Normal file
View file

@ -0,0 +1,3 @@
datasets_dir: "D:/work/automl/datasets" # path to the datasets directory
work_dir: ../workspace # path to the workspace directory
role_dir: storage/SELA # path to the role directory

74
expo/data/custom_task.py Normal file
View file

@ -0,0 +1,74 @@
import os
from expo.data.dataset import SPECIAL_INSTRUCTIONS
from expo.experimenter.mle_bench.instructions import (
ADDITIONAL_NOTES,
INSTRUCTIONS,
INSTRUCTIONS_OBFUSCATED,
)
MLE_BENCH_FILES = ["description.md", "description_obfuscated.md"]
MLE_REQUIREMENTS = """
{instructions}
{additonal_notes}
COMPETITION INSTRUCTIONS
------
{task_description}
## More Instructions
- output_dir: {output_dir}
- Besides `submission.csv`, you should also save your output in the output directory.
- You should split the training data into train and dev set.
- You should use the dev set to improve your model. Print the final dev set score after training.
- Save the prediction results of BOTH the dev set and test set in `dev_predictions.csv` and `test_predictions.csv` respectively in the output directory. They should be in the same format as the `submission.csv`.
- Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. {special_instruction}
**Do not make any plots or visualizations.**
"""
def get_mle_task_id(dataset_dir):
return dataset_dir.split("/")[-3]
def get_mle_is_lower_better(task):
from mlebench.data import get_leaderboard
from mlebench.registry import registry
competition = registry.get_competition(task)
competition_leaderboard = get_leaderboard(competition)
return competition.grader.is_lower_better(competition_leaderboard)
def get_mle_bench_requirements(dataset_dir, data_config, special_instruction, obfuscated=False):
work_dir = data_config["work_dir"]
task = get_mle_task_id(dataset_dir)
output_dir = f"{work_dir}/{task}"
final_output_dir = f"{work_dir}/submission"
os.makedirs(output_dir, exist_ok=True)
if special_instruction:
special_instruction = SPECIAL_INSTRUCTIONS[special_instruction]
else:
special_instruction = ""
if obfuscated:
instructions = INSTRUCTIONS_OBFUSCATED.format(dataset_dir=dataset_dir, output_dir=final_output_dir)
task_file = "description_obfuscated.md"
else:
instructions = INSTRUCTIONS.format(dataset_dir=dataset_dir, output_dir=output_dir)
task_file = "description.md"
with open(os.path.join(dataset_dir, task_file), encoding="utf-8") as f:
task_description = f.read()
mle_requirement = MLE_REQUIREMENTS.format(
instructions=instructions,
additonal_notes=ADDITIONAL_NOTES,
task_description=task_description,
output_dir=output_dir,
special_instruction=special_instruction,
)
print(mle_requirement)
return mle_requirement

395
expo/data/dataset.py Normal file
View file

@ -0,0 +1,395 @@
import argparse
import asyncio
import json
import os
from pathlib import Path
import openml
import pandas as pd
import yaml
from sklearn.model_selection import train_test_split
from expo.insights.solution_designer import SolutionDesigner
from expo.utils import DATA_CONFIG
BASE_USER_REQUIREMENT = """
This is a {datasetname} dataset. Your goal is to predict the target column `{target_col}`.
Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target.
Report {metric} on the eval data. Do not plot or make any visualizations.
"""
USE_AG = """
- Please use autogluon for model training with presets='medium_quality', time_limit=None, give dev dataset to tuning_data, and use right eval_metric.
"""
TEXT_MODALITY = """
- You could use models from transformers library for this text dataset.
- Use gpu if available for faster training.
"""
IMAGE_MODALITY = """
- You could use models from transformers/torchvision library for this image dataset.
- Use gpu if available for faster training.
"""
STACKING = """
- To avoid overfitting, train a weighted ensemble model such as StackingClassifier or StackingRegressor.
- You could do some quick model prototyping to see which models work best and then use them in the ensemble.
"""
SPECIAL_INSTRUCTIONS = {"ag": USE_AG, "stacking": STACKING, "text": TEXT_MODALITY, "image": IMAGE_MODALITY}
DI_INSTRUCTION = """
## Attention
1. Please do not leak the target label in any form during training.
2. Test set does not have the target column.
3. When conducting data exploration or analysis, print out the results of your findings.
4. You should perform transformations on train, dev, and test sets at the same time (it's a good idea to define functions for this and avoid code repetition).
5. When scaling or transforming features, make sure the target column is not included.
6. You could utilize dev set to validate and improve model training. {special_instruction}
## Saving Dev and Test Predictions
1. Save the prediction results of BOTH the dev set and test set in `dev_predictions.csv` and `test_predictions.csv` respectively in the output directory.
- Both files should contain a single column named `target` with the predicted values.
2. Make sure the prediction results are in the same format as the target column in the original training set.
- For instance, if the original target column is a list of string, the prediction results should also be strings.
## Output Performance
Print the train and dev set performance in the last step.
# Output dir
{output_dir}
"""
TASK_PROMPT = """
# User requirement
{user_requirement}
{additional_instruction}
# Data dir
train set (with labels): {train_path}
dev set (with labels): {dev_path}
test set (without labels): {test_path}
dataset description: {data_info_path} (During EDA, you can use this file to get additional information about the dataset)
"""
SEED = 100
TRAIN_TEST_SPLIT = 0.8
TRAIN_DEV_SPLIT = 0.75
OPENML_DATASET_IDS = [
# reg
41021,
42727,
41980,
42225,
531,
# cls
41143,
31,
42733,
41162,
1067,
# multi cls
40498,
40982,
12,
40984,
4538,
]
CUSTOM_DATASETS = [
("04_titanic", "Survived"),
("05_house-prices-advanced-regression-techniques", "SalePrice"),
("06_santander-customer-transaction-prediction", "target"),
("07_icr-identify-age-related-conditions", "Class"),
]
DSAGENT_DATASETS = [("concrete-strength", "Strength"), ("smoker-status", "smoking"), ("software-defects", "defects")]
def get_split_dataset_path(dataset_name, config):
datasets_dir = config["datasets_dir"]
if dataset_name in config["datasets"]:
dataset = config["datasets"][dataset_name]
data_path = os.path.join(datasets_dir, dataset["dataset"])
split_datasets = {
"train": os.path.join(data_path, "split_train.csv"),
"dev": os.path.join(data_path, "split_dev.csv"),
"dev_wo_target": os.path.join(data_path, "split_dev_wo_target.csv"),
"dev_target": os.path.join(data_path, "split_dev_target.csv"),
"test": os.path.join(data_path, "split_test.csv"),
"test_wo_target": os.path.join(data_path, "split_test_wo_target.csv"),
"test_target": os.path.join(data_path, "split_test_target.csv"),
}
return split_datasets
else:
raise ValueError(
f"Dataset {dataset_name} not found in config file. Available datasets: {config['datasets'].keys()}"
)
def get_user_requirement(task_name, config):
# datasets_dir = config["datasets_dir"]
if task_name in config["datasets"]:
dataset = config["datasets"][task_name]
# data_path = os.path.join(datasets_dir, dataset["dataset"])
user_requirement = dataset["user_requirement"]
return user_requirement
else:
raise ValueError(
f"Dataset {task_name} not found in config file. Available datasets: {config['datasets'].keys()}"
)
def save_datasets_dict_to_yaml(datasets_dict, name="datasets.yaml"):
with open(name, "w") as file:
yaml.dump(datasets_dict, file)
def create_dataset_dict(dataset):
dataset_dict = {
"dataset": dataset.name,
"user_requirement": dataset.create_base_requirement(),
"metric": dataset.get_metric(),
"target_col": dataset.target_col,
}
return dataset_dict
def generate_di_instruction(output_dir, special_instruction):
if special_instruction:
special_instruction_prompt = SPECIAL_INSTRUCTIONS[special_instruction]
else:
special_instruction_prompt = ""
additional_instruction = DI_INSTRUCTION.format(
output_dir=output_dir, special_instruction=special_instruction_prompt
)
return additional_instruction
def generate_task_requirement(task_name, data_config, is_di=True, special_instruction=None):
user_requirement = get_user_requirement(task_name, data_config)
split_dataset_path = get_split_dataset_path(task_name, data_config)
train_path = split_dataset_path["train"]
dev_path = split_dataset_path["dev"]
test_path = split_dataset_path["test_wo_target"]
work_dir = data_config["work_dir"]
output_dir = f"{work_dir}/{task_name}"
datasets_dir = data_config["datasets_dir"]
data_info_path = f"{datasets_dir}/{task_name}/dataset_info.json"
if is_di:
additional_instruction = generate_di_instruction(output_dir, special_instruction)
else:
additional_instruction = ""
user_requirement = TASK_PROMPT.format(
user_requirement=user_requirement,
train_path=train_path,
dev_path=dev_path,
test_path=test_path,
additional_instruction=additional_instruction,
data_info_path=data_info_path,
)
print(user_requirement)
return user_requirement
class ExpDataset:
description: str = None
metadata: dict = None
dataset_dir: str = None
target_col: str = None
name: str = None
def __init__(self, name, dataset_dir, **kwargs):
self.name = name
self.dataset_dir = dataset_dir
self.target_col = kwargs.get("target_col", None)
self.force_update = kwargs.get("force_update", False)
self.save_dataset(target_col=self.target_col)
def check_dataset_exists(self):
fnames = [
"split_train.csv",
"split_dev.csv",
"split_test.csv",
"split_dev_wo_target.csv",
"split_dev_target.csv",
"split_test_wo_target.csv",
"split_test_target.csv",
]
for fname in fnames:
if not os.path.exists(Path(self.dataset_dir, self.name, fname)):
return False
return True
def check_datasetinfo_exists(self):
return os.path.exists(Path(self.dataset_dir, self.name, "dataset_info.json"))
def get_raw_dataset(self):
raw_dir = Path(self.dataset_dir, self.name, "raw")
train_df = None
test_df = None
if not os.path.exists(Path(raw_dir, "train.csv")):
raise FileNotFoundError(f"Raw dataset `train.csv` not found in {raw_dir}")
else:
train_df = pd.read_csv(Path(raw_dir, "train.csv"))
if os.path.exists(Path(raw_dir, "test.csv")):
test_df = pd.read_csv(Path(raw_dir, "test.csv"))
return train_df, test_df
def get_dataset_info(self):
raw_df = pd.read_csv(Path(self.dataset_dir, self.name, "raw", "train.csv"))
metadata = {
"NumberOfClasses": raw_df[self.target_col].nunique(),
"NumberOfFeatures": raw_df.shape[1],
"NumberOfInstances": raw_df.shape[0],
"NumberOfInstancesWithMissingValues": int(raw_df.isnull().any(axis=1).sum()),
"NumberOfMissingValues": int(raw_df.isnull().sum().sum()),
"NumberOfNumericFeatures": raw_df.select_dtypes(include=["number"]).shape[1],
"NumberOfSymbolicFeatures": raw_df.select_dtypes(include=["object"]).shape[1],
}
df_head_text = self.get_df_head(raw_df)
dataset_info = {
"name": self.name,
"description": "",
"target_col": self.target_col,
"metadata": metadata,
"df_head": df_head_text,
}
return dataset_info
def get_df_head(self, raw_df):
return raw_df.head().to_string(index=False)
def get_metric(self):
dataset_info = self.get_dataset_info()
num_classes = dataset_info["metadata"]["NumberOfClasses"]
if num_classes == 2:
metric = "f1 binary"
elif 2 < num_classes <= 200:
metric = "f1 weighted"
elif num_classes > 200 or num_classes == 0:
metric = "rmse"
else:
raise ValueError(f"Number of classes {num_classes} not supported")
return metric
def create_base_requirement(self):
metric = self.get_metric()
req = BASE_USER_REQUIREMENT.format(datasetname=self.name, target_col=self.target_col, metric=metric)
return req
def save_dataset(self, target_col):
df, test_df = self.get_raw_dataset()
if not self.check_dataset_exists() or self.force_update:
print(f"Saving Dataset {self.name} in {self.dataset_dir}")
self.split_and_save(df, target_col, test_df=test_df)
else:
print(f"Dataset {self.name} already exists")
if not self.check_datasetinfo_exists() or self.force_update:
print(f"Saving Dataset info for {self.name}")
dataset_info = self.get_dataset_info()
self.save_datasetinfo(dataset_info)
else:
print(f"Dataset info for {self.name} already exists")
def save_datasetinfo(self, dataset_info):
with open(Path(self.dataset_dir, self.name, "dataset_info.json"), "w", encoding="utf-8") as file:
# utf-8 encoding is required
json.dump(dataset_info, file, indent=4, ensure_ascii=False)
def save_split_datasets(self, df, split, target_col=None):
path = Path(self.dataset_dir, self.name)
df.to_csv(Path(path, f"split_{split}.csv"), index=False)
if target_col:
df_wo_target = df.drop(columns=[target_col])
df_wo_target.to_csv(Path(path, f"split_{split}_wo_target.csv"), index=False)
df_target = df[[target_col]].copy()
if target_col != "target":
df_target["target"] = df_target[target_col]
df_target = df_target.drop(columns=[target_col])
df_target.to_csv(Path(path, f"split_{split}_target.csv"), index=False)
def split_and_save(self, df, target_col, test_df=None):
if not target_col:
raise ValueError("Target column not provided")
if test_df is None:
train, test = train_test_split(df, test_size=1 - TRAIN_TEST_SPLIT, random_state=SEED)
else:
train = df
test = test_df
train, dev = train_test_split(train, test_size=1 - TRAIN_DEV_SPLIT, random_state=SEED)
self.save_split_datasets(train, "train")
self.save_split_datasets(dev, "dev", target_col)
self.save_split_datasets(test, "test", target_col)
class OpenMLExpDataset(ExpDataset):
def __init__(self, name, dataset_dir, dataset_id, **kwargs):
self.dataset_id = dataset_id
self.dataset = openml.datasets.get_dataset(
self.dataset_id, download_data=False, download_qualities=False, download_features_meta_data=True
)
self.name = self.dataset.name
self.target_col = self.dataset.default_target_attribute
super().__init__(self.name, dataset_dir, target_col=self.target_col, **kwargs)
def get_raw_dataset(self):
dataset = self.dataset
dataset_df, *_ = dataset.get_data()
raw_dir = Path(self.dataset_dir, self.name, "raw")
os.makedirs(raw_dir, exist_ok=True)
dataset_df.to_csv(Path(raw_dir, "train.csv"), index=False)
return dataset_df, None
def get_dataset_info(self):
dataset_info = super().get_dataset_info()
dataset = self.dataset
dataset_info["name"] = dataset.name
dataset_info["description"] = dataset.description
dataset_info["metadata"].update(dataset.qualities)
return dataset_info
async def process_dataset(dataset, solution_designer: SolutionDesigner, save_analysis_pool, datasets_dict):
if save_analysis_pool:
await solution_designer.generate_solutions(dataset.get_dataset_info(), dataset.name)
dataset_dict = create_dataset_dict(dataset)
datasets_dict["datasets"][dataset.name] = dataset_dict
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--force_update", action="store_true", help="Force update datasets")
parser.add_argument("--save_analysis_pool", action="store_true", help="Save analysis pool")
parser.add_argument(
"--no_save_analysis_pool", dest="save_analysis_pool", action="store_false", help="Do not save analysis pool"
)
parser.set_defaults(save_analysis_pool=True)
return parser.parse_args()
if __name__ == "__main__":
datasets_dir = DATA_CONFIG["datasets_dir"]
args = parse_args()
force_update = args.force_update
save_analysis_pool = args.save_analysis_pool
datasets_dict = {"datasets": {}}
solution_designer = SolutionDesigner()
for dataset_id in OPENML_DATASET_IDS:
openml_dataset = OpenMLExpDataset("", datasets_dir, dataset_id, force_update=force_update)
asyncio.run(process_dataset(openml_dataset, solution_designer, save_analysis_pool, datasets_dict))
for dataset_name, target_col in CUSTOM_DATASETS:
custom_dataset = ExpDataset(dataset_name, datasets_dir, target_col=target_col, force_update=force_update)
asyncio.run(process_dataset(custom_dataset, solution_designer, save_analysis_pool, datasets_dict))
for dataset_name, target_col in DSAGENT_DATASETS:
custom_dataset = ExpDataset(dataset_name, datasets_dir, target_col=target_col, force_update=force_update)
asyncio.run(process_dataset(custom_dataset, solution_designer, save_analysis_pool, datasets_dict))
save_datasets_dict_to_yaml(datasets_dict)

140
expo/data/hf_data.py Normal file
View file

@ -0,0 +1,140 @@
import asyncio
import io
import os
from pathlib import Path
import pandas as pd
from datasets import load_dataset
from PIL import Image
from expo.data.dataset import (
ExpDataset,
parse_args,
process_dataset,
save_datasets_dict_to_yaml,
)
from expo.insights.solution_designer import SolutionDesigner
from expo.utils import DATA_CONFIG
HFDATSETS = [
{"name": "sms_spam", "dataset_name": "ucirvine/sms_spam", "target_col": "label", "modality": "text"},
{"name": "banking77", "dataset_name": "PolyAI/banking77", "target_col": "label", "modality": "text"},
{"name": "gnad10", "dataset_name": "community-datasets/gnad10", "target_col": "label", "modality": "text"},
{
"name": "oxford-iiit-pet",
"dataset_name": "timm/oxford-iiit-pet",
"image_col": "image",
"target_col": "label",
"modality": "image",
},
{
"name": "stanford_cars",
"dataset_name": "tanganke/stanford_cars",
"image_col": "image",
"target_col": "label",
"modality": "image",
},
{
"name": "fashion_mnist",
"dataset_name": "zalando-datasets/fashion_mnist",
"image_col": "image",
"target_col": "label",
"modality": "image",
},
]
class HFExpDataset(ExpDataset):
train_ratio = 0.6
dev_ratio = 0.2
test_ratio = 0.2
def __init__(self, name, dataset_dir, dataset_name, **kwargs):
self.name = name
self.dataset_dir = dataset_dir
self.dataset_name = dataset_name
self.modality = kwargs.get("modality", "")
self.target_col = kwargs.get("target_col", "label")
self.image_col = kwargs.get("image_col", "image")
self.dataset = load_dataset(self.dataset_name, trust_remote_code=True)
super().__init__(self.name, dataset_dir, **kwargs)
def get_raw_dataset(self):
raw_dir = Path(self.dataset_dir, self.name, "raw")
raw_dir.mkdir(parents=True, exist_ok=True)
if os.path.exists(Path(raw_dir, "train.csv")):
df = pd.read_csv(Path(raw_dir, "train.csv"), encoding="utf-8")
else:
df = self.dataset["train"].to_pandas()
if self.modality == "image":
df = self.save_images_and_update_df(df, raw_dir, "train")
df.to_csv(Path(raw_dir, "train.csv"), index=False, encoding="utf-8")
if os.path.exists(Path(raw_dir, "test.csv")):
test_df = pd.read_csv(Path(raw_dir, "test.csv"), encoding="utf-8")
else:
if self.dataset and "test" in self.dataset:
test_df = self.dataset["test"].to_pandas()
if self.modality == "image":
test_df = self.save_images_and_update_df(test_df, raw_dir, "test")
test_df.to_csv(Path(raw_dir, "test.csv"), index=False, encoding="utf-8")
else:
test_df = None
return df, test_df
def save_images_and_update_df(self, df, raw_dir, split):
abs_image_dir = Path(raw_dir, f"{split}_images")
rel_image_dir = f"raw/{split}_images"
abs_image_dir.mkdir(parents=True, exist_ok=True)
def process_image(idx, row):
image_bytes = row[self.image_col]["bytes"]
image = Image.open(io.BytesIO(image_bytes))
if image.mode == "RGBA":
image = image.convert("RGB")
img_path = Path(abs_image_dir, f"{idx}.jpg")
rel_img_path = f"{rel_image_dir}/{idx}.jpg"
image.save(img_path)
return rel_img_path
df["image"] = df.apply(lambda row: process_image(row.name, row), axis=1)
return df
def get_df_head(self, raw_df):
examples = []
for i in range(5):
examples.append(raw_df.iloc[i].to_dict())
return examples
def get_dataset_info(self):
dataset_info = super().get_dataset_info()
dataset = self.dataset
dataset_info["description"] = dataset["train"].info.description
return dataset_info
if __name__ == "__main__":
dataset_dir = DATA_CONFIG["datasets_dir"]
args = parse_args()
force_update = args.force_update
save_analysis_pool = args.save_analysis_pool
datasets_dict = {"datasets": {}}
solution_designer = SolutionDesigner()
for dataset_meta in HFDATSETS:
hf_dataset = HFExpDataset(
dataset_meta["name"],
dataset_dir,
dataset_meta["dataset_name"],
target_col=dataset_meta["target_col"],
image_col=dataset_meta.get("image_col", ""),
force_update=force_update,
modality=dataset_meta["modality"],
)
asyncio.run(process_dataset(hf_dataset, solution_designer, save_analysis_pool, datasets_dict))
save_datasets_dict_to_yaml(datasets_dict, "hf_datasets.yaml")

225
expo/datasets.yaml Normal file
View file

@ -0,0 +1,225 @@
datasets:
titanic:
dataset: 04_titanic
metric: f1
target_col: Survived
user_requirement: "This is a 04_titanic dataset. Your goal is to predict the target\
\ column `Survived`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\
\ or make any visualizations.\n"
house-prices:
dataset: 05_house-prices-advanced-regression-techniques
metric: rmse
target_col: SalePrice
user_requirement: "This is a 05_house-prices-advanced-regression-techniques dataset.\
\ Your goal is to predict the target column `SalePrice`.\nPerform data analysis,\
\ data preprocessing, feature engineering, and modeling to predict the target.\
\ \nReport rmse on the eval data. Do not plot or make any visualizations.\n"
santander-customer:
dataset: 06_santander-customer-transaction-prediction
metric: f1
target_col: target
user_requirement: "This is a 06_santander-customer-transaction-prediction dataset.\
\ Your goal is to predict the target column `target`.\nPerform data analysis,\
\ data preprocessing, feature engineering, and modeling to predict the target.\
\ \nReport f1 on the eval data. Do not plot or make any visualizations.\n"
icr:
dataset: 07_icr-identify-age-related-conditions
metric: f1
target_col: Class
user_requirement: "This is a 07_icr-identify-age-related-conditions dataset. Your\
\ goal is to predict the target column `Class`.\nPerform data analysis, data\
\ preprocessing, feature engineering, and modeling to predict the target. \n\
Report f1 on the eval data. Do not plot or make any visualizations.\n"
Click_prediction_small:
dataset: Click_prediction_small
metric: f1
target_col: click
user_requirement: "This is a Click_prediction_small dataset. Your goal is to predict\
\ the target column `click`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 on the eval data.\
\ Do not plot or make any visualizations.\n"
GesturePhaseSegmentationProcessed:
dataset: GesturePhaseSegmentationProcessed
metric: f1 weighted
target_col: Phase
user_requirement: "This is a GesturePhaseSegmentationProcessed dataset. Your goal\
\ is to predict the target column `Phase`.\nPerform data analysis, data preprocessing,\
\ feature engineering, and modeling to predict the target. \nReport f1 weighted\
\ on the eval data. Do not plot or make any visualizations.\n"
Moneyball:
dataset: Moneyball
metric: rmse
target_col: RS
user_requirement: "This is a Moneyball dataset. Your goal is to predict the target\
\ column `RS`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport rmse on the eval data. Do not\
\ plot or make any visualizations.\n"
SAT11-HAND-runtime-regression:
dataset: SAT11-HAND-runtime-regression
metric: rmse
target_col: runtime
user_requirement: "This is a SAT11-HAND-runtime-regression dataset. Your goal\
\ is to predict the target column `runtime`.\nPerform data analysis, data preprocessing,\
\ feature engineering, and modeling to predict the target. \nReport rmse on\
\ the eval data. Do not plot or make any visualizations.\n"
boston:
dataset: boston
metric: rmse
target_col: MEDV
user_requirement: "This is a boston dataset. Your goal is to predict the target\
\ column `MEDV`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport rmse on the eval data. Do not\
\ plot or make any visualizations.\n"
colleges:
dataset: colleges
metric: rmse
target_col: percent_pell_grant
user_requirement: "This is a colleges dataset. Your goal is to predict the target\
\ column `percent_pell_grant`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport rmse on the eval\
\ data. Do not plot or make any visualizations.\n"
concrete-strength:
dataset: concrete-strength
metric: rmse
target_col: Strength
user_requirement: "This is a concrete-strength dataset. Your goal is to predict\
\ the target column `Strength`.\nPerform data analysis, data preprocessing,\
\ feature engineering, and modeling to predict the target. \nReport rmse on\
\ the eval data. Do not plot or make any visualizations.\n"
credit-g:
dataset: credit-g
metric: f1
target_col: class
user_requirement: "This is a credit-g dataset. Your goal is to predict the target\
\ column `class`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\
\ or make any visualizations.\n"
diamonds:
dataset: diamonds
metric: rmse
target_col: price
user_requirement: "This is a diamonds dataset. Your goal is to predict the target\
\ column `price`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport rmse on the eval data. Do not\
\ plot or make any visualizations.\n"
jasmine:
dataset: jasmine
metric: f1
target_col: class
user_requirement: "This is a jasmine dataset. Your goal is to predict the target\
\ column `class`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\
\ or make any visualizations.\n"
kc1:
dataset: kc1
metric: f1
target_col: defects
user_requirement: "This is a kc1 dataset. Your goal is to predict the target column\
\ `defects`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\
\ or make any visualizations.\n"
kick:
dataset: kick
metric: f1
target_col: IsBadBuy
user_requirement: "This is a kick dataset. Your goal is to predict the target\
\ column `IsBadBuy`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\
\ or make any visualizations.\n"
mfeat-factors:
dataset: mfeat-factors
metric: f1 weighted
target_col: class
user_requirement: "This is a mfeat-factors dataset. Your goal is to predict the\
\ target column `class`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 weighted on the\
\ eval data. Do not plot or make any visualizations.\n"
segment:
dataset: segment
metric: f1 weighted
target_col: class
user_requirement: "This is a segment dataset. Your goal is to predict the target\
\ column `class`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 weighted on the eval data.\
\ Do not plot or make any visualizations.\n"
smoker-status:
dataset: smoker-status
metric: f1
target_col: smoking
user_requirement: "This is a smoker-status dataset. Your goal is to predict the\
\ target column `smoking`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 on the eval data.\
\ Do not plot or make any visualizations.\n"
software-defects:
dataset: software-defects
metric: f1
target_col: defects
user_requirement: "This is a software-defects dataset. Your goal is to predict\
\ the target column `defects`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 on the eval data.\
\ Do not plot or make any visualizations.\n"
steel-plates-fault:
dataset: steel-plates-fault
metric: f1 weighted
target_col: target
user_requirement: "This is a steel-plates-fault dataset. Your goal is to predict\
\ the target column `target`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 weighted on the\
\ eval data. Do not plot or make any visualizations.\n"
wine-quality-white:
dataset: wine-quality-white
metric: f1 weighted
target_col: Class
user_requirement: "This is a wine-quality-white dataset. Your goal is to predict\
\ the target column `Class`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 weighted on the\
\ eval data. Do not plot or make any visualizations.\n"
banking77:
dataset: banking77
metric: f1 weighted
target_col: label
user_requirement: "This is a banking77 dataset. Your goal is to predict the target\
\ column `label`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 weighted on the eval data.\
\ Do not plot or make any visualizations.\n"
fashion_mnist:
dataset: fashion_mnist
metric: f1 weighted
target_col: label
user_requirement: "This is a fashion_mnist dataset. Your goal is to predict the\
\ target column `label`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 weighted on the\
\ eval data. Do not plot or make any visualizations.\n"
gnad10:
dataset: gnad10
metric: f1 weighted
target_col: label
user_requirement: "This is a gnad10 dataset. Your goal is to predict the target\
\ column `label`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 weighted on the eval data.\
\ Do not plot or make any visualizations.\n"
oxford-iiit-pet:
dataset: oxford-iiit-pet
metric: f1 weighted
target_col: label
user_requirement: "This is a oxford-iiit-pet dataset. Your goal is to predict\
\ the target column `label`.\nPerform data analysis, data preprocessing,\
\ feature engineering, and modeling to predict the target. \nReport f1 weighted on the\
\ eval data. Do not plot or make any visualizations.\n"
sms_spam:
dataset: sms_spam
metric: f1
target_col: label
user_requirement: "This is a sms_spam dataset. Your goal is to predict the target\
\ column `label`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\
\ or make any visualizations.\n"
stanford_cars:
dataset: stanford_cars
metric: f1 weighted
target_col: label
user_requirement: "This is a stanford_cars dataset. Your goal is to predict the\
\ target column `label`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 weighted on the\
\ eval data. Do not plot or make any visualizations.\n"

View file

@ -0,0 +1,49 @@
from pathlib import Path
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, mean_squared_error, roc_auc_score
def evaluate_score(pred, gt, metric):
if metric == "accuracy":
return accuracy_score(gt, pred)
elif metric == "f1":
unique_classes = sorted(list(np.unique(gt)))
if 1 in unique_classes and 0 in unique_classes:
pos_label = 1
else:
pos_label = unique_classes[0] if len(unique_classes) == 2 else None
return f1_score(gt, pred, pos_label=pos_label)
elif metric == "f1 weighted":
return f1_score(gt, pred, average="weighted")
elif metric == "roc_auc":
return roc_auc_score(gt, pred)
elif metric == "rmse":
return mean_squared_error(gt, pred, squared=False)
elif metric == "log rmse":
return mean_squared_error(np.log1p(gt), np.log1p(pred), squared=False)
else:
raise ValueError(f"Metric {metric} not supported")
def node_evaluate_score_sela(node):
preds = node.get_and_move_predictions("test")["target"]
gt = node.get_gt("test")["target"]
metric = node.state["dataset_config"]["metric"]
return evaluate_score(preds, gt, metric)
def node_evaluate_score_mlebench(node):
# TODO
from mlebench.grade import grade_csv
from mlebench.registry import registry
competition_id = node.state["task"]
data_dir = Path(node.state["custom_dataset_dir"]).parent.parent.parent # prepared/public/../../../
pred_path = node.get_predictions_path("test")
new_registry = registry.set_data_dir(data_dir)
competition = new_registry.get_competition(competition_id)
submission = Path(pred_path)
report = grade_csv(submission, competition).to_dict()
report["submission_path"] = str(submission)
return report

View file

@ -0,0 +1,162 @@
import textwrap
import matplotlib.pyplot as plt
import networkx as nx
from expo.MCTS import Node
NODE_TEMPLATE = """\
[Node {id}]
Plans:
{plans}
Simulated: {simulated}
Score: {score}, Visits: {num_visits}
"""
NODE_SIZE = 12000
NODE_FONT_SIZE = 18
def get_role_plans(role):
plans = role.planner.plan.tasks
instruct_plans = [f"{i+1}. {task.instruction}" for i, task in enumerate(plans)]
return instruct_plans
def get_tree_text(node: Node):
role_dict = {}
code_set = set()
def load_role(node):
if node.id not in role_dict:
role_dict[node.id] = node.load_role()
return role_dict[node.id]
def visualize_node(node: Node, previous_plans=None):
role = load_role(node)
node_id = node.id
plans = role.planner.plan.tasks
instruct_plans = [f"{i+1}. {task.instruction}" for i, task in enumerate(plans)]
if previous_plans is not None:
instruct_plans = [plan for plan, prev_plan in zip(instruct_plans, previous_plans) if plan != prev_plan]
instruct_plans_text = "\n".join(instruct_plans)
simulated = role.state_saved
score = f"avg score: {node.avg_value()}, simulated score: {node.raw_reward}"
num_visits = node.visited
return NODE_TEMPLATE.format(
id=node_id, plans=instruct_plans_text, simulated=simulated, score=score, num_visits=num_visits
)
def visualize_tree_text(node, depth=0, previous_plans=None):
text = ""
if node is not None:
text += visualize_node(node, previous_plans)
role = load_role(node)
code_set.update({task.instruction for task in role.planner.plan.tasks})
previous_plans = get_role_plans(role)
for child in node.children:
text += textwrap.indent(visualize_tree_text(child, depth + 1, previous_plans), "\t")
return text
num_simulations = node.visited
text = f"Number of simulations: {num_simulations}\n"
text += visualize_tree_text(node)
return text, len(code_set)
def get_node_color(node):
if node["visits"] == 0:
return "#D3D3D3"
else:
# The higher the avg_value, the more intense the color
# avg_value is between 0 and 1
avg_value = node["avg_value"]
# Convert avg_value to a color ranging from red (low) to green (high)
red = int(255 * (1 - avg_value))
green = int(255 * avg_value)
return f"#{red:02X}{green:02X}00"
def visualize_tree(graph, show_instructions=False, save_path=""):
# Use a hierarchical layout for tree-like visualization
pos = nx.spring_layout(graph, k=0.9, iterations=50)
plt.figure(figsize=(30, 20)) # Further increase figure size for better visibility
# Calculate node levels
root = "0"
levels = nx.single_source_shortest_path_length(graph, root)
max_level = max(levels.values())
# Adjust y-coordinates based on levels and x-coordinates to prevent overlap
nodes_by_level = {}
for node, level in levels.items():
if level not in nodes_by_level:
nodes_by_level[level] = []
nodes_by_level[level].append(node)
for level, nodes in nodes_by_level.items():
y = 1 - level / max_level
x_step = 1.0 / (len(nodes) + 1)
for i, node in enumerate(sorted(nodes)):
pos[node] = ((i + 1) * x_step, y)
# Draw edges
nx.draw_networkx_edges(graph, pos, edge_color="gray", arrows=True, arrowsize=40, width=3)
# Draw nodes
node_colors = [get_node_color(graph.nodes[node]) for node in graph.nodes]
nx.draw_networkx_nodes(graph, pos, node_size=NODE_SIZE, node_color=node_colors)
# Add labels to nodes
labels = nx.get_node_attributes(graph, "label")
nx.draw_networkx_labels(graph, pos, labels, font_size=NODE_FONT_SIZE)
if show_instructions:
# Add instructions to the right side of nodes
instructions = nx.get_node_attributes(graph, "instruction")
for node, (x, y) in pos.items():
wrapped_text = textwrap.fill(instructions[node], width=30) # Adjust width as needed
plt.text(x + 0.05, y, wrapped_text, fontsize=15, ha="left", va="center")
plt.title("MCTS Tree Visualization", fontsize=40)
plt.axis("off") # Turn off axis
plt.tight_layout()
if save_path:
plt.savefig(save_path)
plt.show()
def build_tree_recursive(graph, parent_id, node, start_task_id=2):
"""
Recursively builds the entire tree starting from the root node.
Adds nodes and edges to the NetworkX graph.
"""
role = node.load_role()
depth = node.get_depth()
if depth == 0:
instruction = "\n\n".join([role.planner.plan.tasks[i].instruction for i in range(start_task_id)])
else:
instruction = role.planner.plan.tasks[depth + start_task_id - 1].instruction
print(instruction)
# Add the current node with attributes to the graph
dev_score = node.raw_reward.get("dev_score", 0) * 100
avg_score = node.avg_value() * 100
graph.add_node(
parent_id,
label=f"{node.id}\nAvg: {avg_score:.1f}\nScore: {dev_score:.1f}\nVisits: {node.visited}",
avg_value=node.avg_value(),
dev_score=dev_score,
visits=node.visited,
instruction=instruction,
)
# Stopping condition: if the node has no children, return
if not node.children:
return
# Recursively create all child nodes
for i, child in enumerate(node.children):
child_id = f"{parent_id}-{i}"
graph.add_edge(parent_id, child_id)
build_tree_recursive(graph, child_id, child)

View file

31
expo/experimenter/aide.py Normal file
View file

@ -0,0 +1,31 @@
import aide
import os
import time
os.environ["OPENAI_API_KEY"] = "sk-xxx"
os.environ["OPENAI_BASE_URL"] = "your url"
start_time = time.time()
data_dir = "xxx/data/titanic"
goal = f"""
# User requirement
({data_dir}, 'This is a 04_titanic dataset. Your goal is to predict the target column `Survived`.\nPerform data analysis, data preprocessing, feature engineering, and modeling to predict the target. \nReport f1 on the eval data. Do not plot or make any visualizations.\n')
# Data dir
training (with labels): train.csv
testing (without labels): test.csv
dataset description: dataset_info.json (You can use this file to get additional information about the dataset)"""
exp = aide.Experiment(
data_dir=data_dir, # replace this with your own directory
goal=goal,
eval="f1", # replace with your own evaluation metric
)
best_solution = exp.run(steps=10)
print(f"Best solution has validation metric: {best_solution.valid_metric}")
print(f"Best solution code: {best_solution.code}")
end_time = time.time()
execution_time = end_time - start_time
print(f"run time : {execution_time} seconds")

55
expo/experimenter/aug.py Normal file
View file

@ -0,0 +1,55 @@
from expo.experimenter.experimenter import Experimenter
from expo.insights.instruction_generator import InstructionGenerator
from expo.research_assistant import ResearchAssistant
from expo.utils import get_exp_pool_path
EXPS_PROMPT = """
When doing the tasks, you can refer to the insights below:
{experience}
"""
class AugExperimenter(Experimenter):
result_path: str = "results/aug"
async def run_experiment(self):
# state = create_initial_state(self.args.task, start_task_id=1, data_config=self.data_config, low_is_better=self.args.low_is_better, name="")
user_requirement = self.state["requirement"]
exp_pool_path = get_exp_pool_path(self.args.task, self.data_config, pool_name="ds_analysis_pool")
exp_pool = InstructionGenerator.load_analysis_pool(
exp_pool_path, use_fixed_insights=self.args.use_fixed_insights
)
if self.args.aug_mode == "single":
exps = InstructionGenerator._random_sample(exp_pool, self.args.num_experiments)
exps = [exp["Analysis"] for exp in exps]
elif self.args.aug_mode == "set":
exps = []
for i in range(self.args.num_experiments):
exp_set = InstructionGenerator.sample_instruction_set(exp_pool)
exp_set_text = "\n".join([f"{exp['task_id']}: {exp['Analysis']}" for exp in exp_set])
exps.append(exp_set_text)
else:
raise ValueError(f"Invalid mode: {self.args.aug_mode}")
results = []
for i in range(self.args.num_experiments):
di = ResearchAssistant(
node_id=str(i), use_reflection=self.args.reflection, role_timeout=self.args.role_timeout
)
di.role_dir = f"{di.role_dir}_{self.args.task}"
requirement = user_requirement + EXPS_PROMPT.format(experience=exps[i])
print(requirement)
score_dict = await self.run_di(di, requirement, run_idx=i)
results.append(
{
"idx": i,
"score_dict": score_dict,
"aug_mode": self.args.aug_mode,
"insights": exps[i],
"user_requirement": requirement,
"args": vars(self.args),
}
)
results = self.summarize_results(results)
self.save_result(results)

View file

@ -0,0 +1,126 @@
from datetime import datetime
from expo.experimenter.custom import CustomExperimenter
import os
import pandas as pd
class AGRunner:
def __init__(self, state=None):
self.state = state
self.datasets = self.state["datasets_dir"]
def run(self):
from autogluon.tabular import TabularDataset, TabularPredictor
train_path = self.datasets["train"]
dev_path = self.datasets["dev"]
dev_wo_target_path = self.datasets["dev_wo_target"]
test_wo_target_path = self.datasets["test_wo_target"]
target_col = self.state["dataset_config"]["target_col"]
train_data = TabularDataset(train_path)
dev_data = TabularDataset(dev_path)
dev_wo_target_data = TabularDataset(dev_wo_target_path)
test_data = TabularDataset(test_wo_target_path)
eval_metric = self.state["dataset_config"]["metric"].replace(" ", "_")
predictor = TabularPredictor(
label=target_col,
eval_metric=eval_metric,
path="AutogluonModels/ag-{}-{}".format(self.state["task"], datetime.now().strftime("%y%m%d_%H%M")),
).fit(train_data=train_data, tuning_data=dev_data, num_gpus=1)
dev_preds = predictor.predict(dev_wo_target_data)
test_preds = predictor.predict(test_data)
return {"test_preds": test_preds, "dev_preds": dev_preds}
def run_multimodal(self):
from autogluon.multimodal import MultiModalPredictor
target_col = self.state["dataset_config"]["target_col"]
train_path = self.datasets["train"]
dev_path = self.datasets["dev"]
dev_wo_target_path = self.datasets["dev_wo_target"] # Updated variable name
test_wo_target_path = self.datasets["test_wo_target"]
eval_metric = self.state["dataset_config"]["metric"].replace(" ", "_")
# Load the datasets
train_data, dev_data, dev_wo_target_data, test_data = self.load_split_dataset(
train_path, dev_path, dev_wo_target_path, test_wo_target_path
)
# Create and fit the predictor
predictor = MultiModalPredictor(
label=target_col,
eval_metric=eval_metric,
path="AutogluonModels/ag-{}-{}".format(self.state["task"], datetime.now().strftime("%y%m%d_%H%M")),
).fit(train_data=train_data, tuning_data=dev_data)
# Make predictions on dev and test datasets
dev_preds = predictor.predict(dev_wo_target_data)
test_preds = predictor.predict(test_data)
# Return predictions for dev and test datasets
return {
"dev_preds": dev_preds,
"test_preds": test_preds
}
def load_split_dataset(self, train_path, dev_path, dev_wo_target_path, test_wo_target_path):
"""
Loads training, dev, and test datasets from given file paths
Args:
train_path (str): Path to the training dataset.
dev_path (str): Path to the dev dataset with target labels.
dev_wo_target_path (str): Path to the dev dataset without target labels.
test_wo_target_path (str): Path to the test dataset without target labels.
Returns:
train_data (pd.DataFrame): Loaded training dataset with updated image paths.
dev_data (pd.DataFrame): Loaded dev dataset with updated image paths.
dev_wo_target_data (pd.DataFrame): Loaded dev dataset without target labels and updated image paths.
test_data (pd.DataFrame): Loaded test dataset with updated image paths.
"""
# Define the root path to append
root_folder = os.path.join("F:/Download/Dataset/", self.state["task"])
# Load the datasets
train_data = pd.read_csv(train_path)
dev_data = pd.read_csv(dev_path) # Load dev dataset with target labels
dev_wo_target_data = pd.read_csv(dev_wo_target_path) # Load dev dataset without target labels
test_data = pd.read_csv(test_wo_target_path)
# Get the name of the first column (assuming it's the image path column)
image_column = train_data.columns[0]
# Append root folder path to the image column in each dataset
train_data[image_column] = train_data[image_column].apply(lambda x: os.path.join(root_folder, x))
dev_data[image_column] = dev_data[image_column].apply(lambda x: os.path.join(root_folder, x))
dev_wo_target_data[image_column] = dev_wo_target_data[image_column].apply(
lambda x: os.path.join(root_folder, x))
test_data[image_column] = test_data[image_column].apply(lambda x: os.path.join(root_folder, x))
return train_data, dev_data, dev_wo_target_data, test_data
class GluonExperimenter(CustomExperimenter):
result_path: str = "results/autogluon"
def __init__(self, args, **kwargs):
super().__init__(args, **kwargs)
self.framework = AGRunner(self.state)
self.is_multimodal = args.is_multimodal if hasattr(args, 'is_multimodal') else False
async def run_experiment(self):
if not self.is_multimodal:
result = self.framework.run()
else:
result = self.framework.run_multimodal()
assert result is not None
user_requirement = self.state["requirement"]
dev_preds = result["dev_preds"]
test_preds = result["test_preds"]
score_dict = {
"dev_score": self.evaluate_predictions(dev_preds, "dev"),
"test_score": self.evaluate_predictions(test_preds, "test"),
}
results = [0, {"score_dict": score_dict, "user_requirement": user_requirement, "args": vars(self.args)}]
self.save_result(results)

View file

@ -0,0 +1,96 @@
from datetime import datetime
import pandas as pd
from expo.experimenter.custom import CustomExperimenter
from expo.evaluation.evaluation import evaluate_score
from functools import partial
def custom_scorer(y_true, y_pred, metric_name):
return evaluate_score(y_pred, y_true, metric_name)
class ASRunner:
time_limit = 600
def __init__(self, state=None):
self.state = state
self.datasets = self.state["datasets_dir"]
def create_autosklearn_scorer(self, metric_name):
from autosklearn.metrics import make_scorer
return make_scorer(
name=metric_name, score_func=partial(custom_scorer, metric_name=metric_name)
)
def run(self):
import autosklearn.classification
import autosklearn.regression
train_path = self.datasets["train"]
dev_wo_target_path = self.datasets["dev_wo_target"]
test_wo_target_path = self.datasets["test_wo_target"]
target_col = self.state["dataset_config"]["target_col"]
train_data = pd.read_csv(train_path)
dev_data = pd.read_csv(dev_wo_target_path)
test_data = pd.read_csv(test_wo_target_path)
eval_metric = self.state["dataset_config"]["metric"]
X_train = train_data.drop(columns=[target_col])
y_train = train_data[target_col]
if eval_metric == "rmse":
automl = autosklearn.regression.AutoSklearnRegressor(
time_left_for_this_task=self.time_limit,
metric=self.create_autosklearn_scorer(eval_metric),
memory_limit=8192,
tmp_folder="AutosklearnModels/as-{}-{}".format(
self.state["task"], datetime.now().strftime("%y%m%d_%H%M")
),
n_jobs=-1,
)
elif eval_metric in ["f1", "f1 weighted"]:
automl = autosklearn.classification.AutoSklearnClassifier(
time_left_for_this_task=self.time_limit,
metric=self.create_autosklearn_scorer(eval_metric),
memory_limit=8192,
tmp_folder="AutosklearnModels/as-{}-{}".format(
self.state["task"], datetime.now().strftime("%y%m%d_%H%M")
),
n_jobs=-1,
)
else:
raise ValueError(f"Unsupported metric: {eval_metric}")
automl.fit(X_train, y_train)
dev_preds = automl.predict(dev_data)
test_preds = automl.predict(test_data)
return {"test_preds": test_preds, "dev_preds": dev_preds}
class AutoSklearnExperimenter(CustomExperimenter):
result_path: str = "results/autosklearn"
def __init__(self, args, **kwargs):
super().__init__(args, **kwargs)
self.framework = ASRunner(self.state)
async def run_experiment(self):
result = self.framework.run()
user_requirement = self.state["requirement"]
dev_preds = result["dev_preds"]
test_preds = result["test_preds"]
score_dict = {
"dev_score": self.evaluate_predictions(dev_preds, "dev"),
"test_score": self.evaluate_predictions(test_preds, "test"),
}
results = [
0,
{
"score_dict": score_dict,
"user_requirement": user_requirement,
"args": vars(self.args),
},
]
self.save_result(results)

View file

@ -0,0 +1,62 @@
import os
import pandas as pd
from expo.evaluation.evaluation import evaluate_score
from expo.experimenter.experimenter import Experimenter
from expo.MCTS import create_initial_state
class CustomExperimenter(Experimenter):
result_path: str = "results/custom"
def __init__(self, args, **kwargs):
super().__init__(args, **kwargs)
self.framework = kwargs.get("framework", None) # todo
self.task = kwargs.get("task", self.args.task)
self.low_is_better = kwargs.get("low_is_better", self.args.low_is_better)
self.name = kwargs.get("name", "")
self.result_path = f"results/custom_{self.name}"
self.state = create_initial_state(
self.task,
start_task_id=1,
data_config=self.data_config,
args=self.args,
)
def run_experiment(self):
user_requirement = self.state["requirement"]
preds = self.framework.run(user_requirement)
test_preds = preds["test_preds"]
dev_preds = preds["dev_preds"]
score_dict = {
"dev_score": self.evaluate_predictions(dev_preds, "dev"),
"test_score": self.evaluate_predictions(test_preds, "test"),
}
results = {"score_dict": score_dict, "user_requirement": user_requirement, "args": vars(self.args)}
self.save_result(results)
def evaluate_pred_files(self, dev_pred_path, test_pred_path):
dev_preds = pd.read_csv(dev_pred_path)["target"]
test_preds = pd.read_csv(test_pred_path)["target"]
score_dict = {
"dev_score": self.evaluate_score(dev_preds, "dev"),
"test_score": self.evaluate_score(test_preds, "test"),
}
return score_dict
def evaluate_predictions(self, preds, split):
metric = self.state["dataset_config"]["metric"]
gt_path = os.path.join(self.state["datasets_dir"][f"{split}_target"])
gt = pd.read_csv(gt_path)["target"]
score = evaluate_score(preds, gt, metric)
return score
def load_datasets(self):
train_path = self.state["datasets_dir"]["train"]
dev_path = self.state["datasets_dir"]["dev"]
test_path = self.state["datasets_dir"]["test"]
train = pd.read_csv(train_path)
dev = pd.read_csv(dev_path)
test = pd.read_csv(test_path)
return train, dev, test

View file

@ -0,0 +1,135 @@
import datetime
import json
import os
import numpy as np
import pandas as pd
from expo.evaluation.evaluation import evaluate_score
from expo.MCTS import create_initial_state
from expo.research_assistant import ResearchAssistant
from expo.utils import DATA_CONFIG, save_notebook
class Experimenter:
result_path: str = "results/base"
data_config = DATA_CONFIG
start_task_id = 1
def __init__(self, args, **kwargs):
self.args = args
self.start_time_raw = datetime.datetime.now()
self.start_time = self.start_time_raw.strftime("%Y%m%d%H%M")
self.state = create_initial_state(
self.args.task,
start_task_id=self.start_task_id,
data_config=self.data_config,
args=self.args,
)
async def run_di(self, di, user_requirement, run_idx):
max_retries = 3
num_runs = 1
run_finished = False
while num_runs <= max_retries and not run_finished:
try:
await di.run(user_requirement)
score_dict = await di.get_score()
score_dict = self.evaluate(score_dict, self.state)
run_finished = True
except Exception as e:
print(f"Error: {e}")
num_runs += 1
# save_notebook(role=di, save_dir=self.result_path, name=f"{self.args.task}_{self.start_time}_{run_idx}")
save_name = self.get_save_name()
save_notebook(role=di, save_dir=self.result_path, name=f"{save_name}_{run_idx}")
if not run_finished:
score_dict = {"train_score": -1, "dev_score": -1, "test_score": -1, "score": -1}
return score_dict
def summarize_results(self, results):
dev_scores = [result["score_dict"]["dev_score"] for result in results]
best_dev_score = (
max(dev_scores)
if not self.args.low_is_better
else min([score for score in dev_scores if score != -1] + [np.inf])
)
best_score_idx = dev_scores.index(best_dev_score)
test_scores = [result["score_dict"]["test_score"] for result in results]
avg_score = sum(test_scores) / len(test_scores)
global_best_score = (
max(test_scores)
if not self.args.low_is_better
else min([score for i, score in enumerate(test_scores) if dev_scores[i] != -1] + [np.inf])
)
results.insert(
0,
{
"best_dev_score": best_dev_score,
"best_dev_score_idx": best_score_idx,
"best_dev_test_score": test_scores[best_score_idx],
"avg_test_score": avg_score,
"global_best_test_score": global_best_score,
},
)
return results
async def run_experiment(self):
state = self.state
user_requirement = state["requirement"]
results = []
for i in range(self.args.num_experiments):
di = ResearchAssistant(
node_id="0", use_reflection=self.args.reflection, role_timeout=self.args.role_timeout
)
score_dict = await self.run_di(di, user_requirement, run_idx=i)
results.append(
{"idx": i, "score_dict": score_dict, "user_requirement": user_requirement, "args": vars(self.args)}
)
self.save_result(results) # save intermediate results
results = self.summarize_results(results)
self.save_result(results)
def evaluate_prediction(self, split, state):
pred_path = os.path.join(state["work_dir"], state["task"], f"{split}_predictions.csv")
os.makedirs(state["node_dir"], exist_ok=True)
pred_node_path = os.path.join(state["node_dir"], f"{self.start_time}-{split}_predictions.csv")
gt_path = os.path.join(state["datasets_dir"][f"{split}_target"])
preds = pd.read_csv(pred_path)
preds = preds[preds.columns.tolist()[-1]]
preds.to_csv(pred_node_path, index=False)
gt = pd.read_csv(gt_path)["target"]
metric = state["dataset_config"]["metric"]
os.remove(pred_path)
return evaluate_score(preds, gt, metric)
def evaluate(self, score_dict, state):
scores = {
"dev_score": self.evaluate_prediction("dev", state),
"test_score": self.evaluate_prediction("test", state),
}
score_dict.update(scores)
return score_dict
def get_save_name(self):
return f"{self.args.exp_mode}-{self.args.task}_{self.start_time}"
def save_result(self, result):
end_time_raw = datetime.datetime.now()
end_time = end_time_raw.strftime("%Y%m%d%H%M")
time_info = {
"start_time": self.start_time,
"end_time": end_time,
"duration (seconds)": (end_time_raw - self.start_time_raw).seconds,
}
result = result.copy()
result.insert(0, time_info)
save_name = self.get_save_name()
os.makedirs(self.result_path, exist_ok=True)
with open(f"{self.result_path}/{save_name}.json", "w") as f:
json.dump(result, f, indent=4)

81
expo/experimenter/mcts.py Normal file
View file

@ -0,0 +1,81 @@
import shutil
from expo.evaluation.evaluation import (
node_evaluate_score_mlebench,
node_evaluate_score_sela,
)
from expo.evaluation.visualize_mcts import get_tree_text
from expo.experimenter.experimenter import Experimenter
from expo.Greedy import Greedy, Random
from expo.MCTS import MCTS
class MCTSExperimenter(Experimenter):
result_path: str = "results/mcts"
def __init__(self, args, tree_mode=None, **kwargs):
if args.special_instruction == "image":
self.start_task_id = 1 # start from datapreprocessing if it is image task
else:
self.start_task_id = args.start_task_id
if args.eval_func == "sela":
self.eval_func = node_evaluate_score_sela
elif args.eval_func == "mlebench":
self.eval_func = node_evaluate_score_mlebench
super().__init__(args, **kwargs)
self.tree_mode = tree_mode
async def run_experiment(self):
use_fixed_insights = self.args.use_fixed_insights
depth = self.args.max_depth
if self.tree_mode == "greedy":
mcts = Greedy(root_node=None, max_depth=depth, use_fixed_insights=use_fixed_insights)
elif self.tree_mode == "random":
mcts = Random(root_node=None, max_depth=depth, use_fixed_insights=use_fixed_insights)
else:
mcts = MCTS(root_node=None, max_depth=depth, use_fixed_insights=use_fixed_insights)
best_nodes = await mcts.search(state=self.state, args=self.args)
best_node = best_nodes["global_best"]
dev_best_node = best_nodes["dev_best"]
score_dict = best_nodes["scores"]
additional_scores = {"grader": self.eval_func(dev_best_node)}
text, num_generated_codes = get_tree_text(mcts.root_node)
text += f"Generated {num_generated_codes} unique codes.\n"
text += f"Best node: {best_node.id}, score: {best_node.raw_reward}\n"
text += f"Dev best node: {dev_best_node.id}, score: {dev_best_node.raw_reward}\n"
text += f"Grader score: {additional_scores['grader']}\n"
print(text)
results = [
{
"best_node": best_node.id,
"best_node_score": best_node.raw_reward,
"dev_best_node": dev_best_node.id,
"dev_best_node_score": dev_best_node.raw_reward,
"num_generated_codes": num_generated_codes,
"user_requirement": best_node.state["requirement"],
"tree_text": text,
"args": vars(self.args),
"scores": score_dict,
"additional_scores": additional_scores,
}
]
self.save_result(results)
self.copy_notebook(best_node, "best")
self.copy_notebook(dev_best_node, "dev_best")
self.save_tree(text)
def copy_notebook(self, node, name):
node_dir = node.get_node_dir()
node_nb_dir = f"{node_dir}/Node-{node.id}.ipynb"
save_name = self.get_save_name()
copy_nb_dir = f"{self.result_path}/{save_name}_{name}.ipynb"
shutil.copy(node_nb_dir, copy_nb_dir)
def save_tree(self, tree_text):
save_name = self.get_save_name()
fpath = f"{self.result_path}/{save_name}_tree.txt"
with open(fpath, "w") as f:
f.write(tree_text)

View file

@ -0,0 +1,48 @@
SUBMISSION_INSTURCTION = """
- **Submission checker**: We provide a tool for checking the validity of your submission file, in the form of a server endpoint at `http://localhost:5000/validate`. If you post a submission file to the server (e.g. `curl -X POST -F "file=@${SUBMISSION_FILE}" ${SERVER_URL}`), you will receive a response message telling you if your submission is valid or not (note that this does NOT give you a score). For your convenience, we provide a script `/home/validate_submission.sh` which can help you make this submission to the validation server.
"""
INSTRUCTIONS = """
You are participating in *MLE-bench*, an offline version of Kaggle competitions adapted to test the coding ability of agents like yourself.
You will be given a machine learning task. You must solve the task by training a model and running the model on the test set to produce a submission file. You may not hand-label the data to produce a submission file.
BENCHMARK INSTRUCTIONS
------
First, please read the instructions below which are specific to our benchmark setup. Note, these instructions can be found again in `expo.experimenter.mle_bench.instructions.py`.
- **Competition-specific instructions**: The instructions for the competition are available in `{dataset_dir}/description.md`. Read this file to understand the problem setup for the competition you are currently solving.
- **Dataset**: The dataset files for the competition are available in `{dataset_dir}`.
- **Submission**: We have simplified all competition types to use a simple CSV submission format. Regardless of what the competition instructions say, you MUST produce a submission file at `{output_dir}/submission.csv`. We will only evaluate submissions found at this path; failure to submit a `submission.csv` at this path will result in a failed attempt.
- **Test set**: In general, since we do not have access to the original test labels, we have constructed our own train/test split for each competition. As such, the dataset you see in `{dataset_dir}` may differ slightly from what is described in the competition instructions. Nevertheless the files available to you should otherwise be similar to the original and you should proceed with the competition task on the train/test splits available to you.
- **Cheating**: You are not allowed to view others' solutions to this competition. If we detect that your submission is plagiarized from others, the submission will be disqualified.
Your submission will be graded based on the metric in the competition-specific instructions. Your goal is to achieve the best score possible on the competition task.
Wherever there is a conflict between the points above and the competition-specific instructions in `{dataset_dir}/description.md`, the above points should always take priority since these are always true for our offline benchmark setup.
"""
INSTRUCTIONS_OBFUSCATED = """
You are participating in *MLE-bench*, a benchmark designed to test the coding ability of agents like yourself.
You will be given a machine learning task. You must solve the task by training a model and running the model on the test set to produce a submission file. You may not hand-label the data to produce a submission file.
BENCHMARK INSTRUCTIONS
------
First, please read the instructions below which are specific to our benchmark setup. Note, these instructions can be found again in `expo.experimenter.mle_bench.instructions.py`.
- **Task-specific instructions**: The instructions for the task are available in `{dataset_dir}/description.md`. Read this file to understand the problem setup for the task you are currently solving.
- **Dataset**: The dataset files for the task are available in `{dataset_dir}/`.
- **Submission**: Regardless of what the task instructions say, you MUST produce a submission file at `{output_dir}/submission.csv`. We will only evaluate submissions found at this path; failure to submit a `submission.csv` at this path will result in a failed attempt.
- **Test set**: In general, since we do not have access to the original test labels, we have constructed our own train/test split for each task. As such, the dataset you see in `{dataset_dir}` may differ slightly from what is described in the task instructions. Nevertheless the files available to you should otherwise be similar to the original and you should proceed with the task on the train/test splits available to you.
- **Cheating**: You are not allowed to view others' solutions to this task. If we detect that your submission is plagiarized from others, the submission will be disqualified.
Your submission will be graded based on the metric in the task-specific instructions. Your goal is to achieve the best score possible on the task.
Wherever there is a conflict between the points above and the task-specific instructions in `{dataset_dir}/description.md`, the above points should always take priority since these are always true for our offline benchmark setup.
"""
ADDITIONAL_NOTES = """
ADDITIONAL NOTES
------
- **Compute**: You have access to gpu with the appropriate drivers installed.
"""

View file

@ -0,0 +1,22 @@
[
{
"Analysis": "Use early stopping, hyperparameter tuning, and cross-validation to avoid overfitting and improve robustness of the model.",
"Category": "Model Training",
"task_id": 4
},
{
"Analysis": "use k-fold bagging and early stopping",
"Category": "Model Training",
"task_id": 4
},
{
"Analysis": "To avoid overfitting, train a weighted ensemble model such as StackingClassifier or StackingRegressor; You could do some quick model prototyping to see which models work best and then use them in the ensemble.",
"Category": "Model Training",
"task_id": 4
},
{
"Analysis": "Please use autogluon for model training with presets='medium_quality', time_limit=None, give dev dataset to tuning_data, and use right eval_metric.",
"Category": "Model Training",
"task_id": 4
}
]

View file

@ -0,0 +1,167 @@
import json
import os
import random
from difflib import SequenceMatcher
from expo.insights.solution_designer import SolutionDesigner
from expo.utils import clean_json_from_rsp, load_data_config, mcts_logger
from metagpt.llm import LLM
from metagpt.schema import Message
REFLECTION_SYSTEM_MSG = "As a Kaggle grandmaster participating in a competition, you need to analyze your experience and propose evolutionary points that are more likely to improve the performance of baseline code."
CHANGE_INSTRUCTION = """
# Original instruction
{instruction}
# Insights
{insights}
Rewrite the original instruction according to the insights
# Expected Output Hard Format
```json
{{
"Original Instruction": "original instruction",
"New Instruction": "new instruction"
}}
```
"""
DATA_CONFIG = load_data_config()
class InstructionGenerator:
data_config = DATA_CONFIG
def __init__(self, state, use_fixed_insights, from_scratch):
self.state = state
self.file_path = state["exp_pool_path"]
if state["custom_dataset_dir"]:
with open(f"{state['custom_dataset_dir']}/description.md", "r", encoding="utf-8") as file:
self.dataset_info = file.read()
else:
dataset_info_path = (
f"{self.data_config['datasets_dir']}/{state['dataset_config']['dataset']}/dataset_info.json"
)
with open(dataset_info_path, "r") as file:
self.dataset_info = json.load(file)
self.use_fixed_insights = use_fixed_insights
self.proposer = SolutionDesigner()
if self.file_path is None:
self.from_scratch = True
else:
self.from_scratch = from_scratch
async def initialize(self):
if self.from_scratch:
self.insight_pool = await self.generate_solutions_from_scratch(self.dataset_info, self.state["task"])
else:
self.insight_pool = self.load_insight_pool(self.file_path, self.use_fixed_insights)
@staticmethod
def load_json_data(json_dir):
with open(json_dir, "r") as file:
json_data = json.load(file)
return json_data
@staticmethod
def _random_sample(analysis, num_samples):
return random.sample(analysis, num_samples)
@staticmethod
def sample_instruction_set(data):
data_dict = {}
for item in data:
task_id = item["task_id"]
if task_id not in data_dict:
data_dict[task_id] = []
data_dict[task_id].append(item)
instruction_set = []
for task_id in sorted(data_dict.keys()):
instruction_set.append(random.choice(data_dict[task_id]))
return instruction_set
@staticmethod
def format_output(rsp):
rsp_list = []
new_data = []
rsp_list.append(rsp)
for item in rsp_list:
item_dict = json.loads(item)
data = {
"Insights": item_dict,
}
new_data.append(data)
return new_data
@staticmethod
def load_insight_pool(file_path, use_fixed_insights, task_id=None):
data = InstructionGenerator.load_json_data(file_path)
if use_fixed_insights:
current_directory = os.path.dirname(__file__)
fixed_insights = InstructionGenerator.load_json_data(f"{current_directory}/fixed_insights.json")
data.extend(fixed_insights)
for item in data:
if "task_id" not in item:
raise ValueError("task_id is not found in the insight_pool")
if task_id:
data = [item for item in data if int(item["task_id"]) == int(task_id)]
return data
async def generate_new_instructions(self, task_id, original_instruction, max_num, ext_info=None):
data = self.insight_pool
new_instructions = []
if len(data) == 0:
mcts_logger.log("MCTS", f"No insights available for task {task_id}")
# return [original_instruction] # Return the original instruction if no insights are available
for i in range(max_num):
if len(data) == 0:
insights = "No insights available"
else:
item = data[i]
insights = item["Analysis"]
new_instruction = await InstructionGenerator.generate_new_instruction(
original_instruction, insights, ext_info
)
new_instructions.append(new_instruction)
return new_instructions
async def propose_new_insights(self, solution, score):
new_insights = await self.proposer.propose_insights(solution, score)
added_insights = self.add_insight(new_insights)
return added_insights
async def generate_solutions_from_scratch(self, dataset_info, dataset_name):
insight_pool = await self.proposer.generate_solutions(dataset_info, dataset_name, save_analysis_pool=False)
return insight_pool
def add_insight(self, new_insights):
added_insights = []
for new_insight in new_insights:
if not self.is_similar_to_existing(new_insight):
added_insights.append(new_insight)
self.insight_pool.append(new_insight)
return added_insights
def is_similar_to_existing(self, new_insight, similarity_threshold=0.8):
for existing_insight in self.insight_pool:
similarity = self.calculate_similarity(new_insight["Analysis"], existing_insight["Analysis"])
if similarity > similarity_threshold:
return True
return False
@staticmethod
def calculate_similarity(text1, text2):
return SequenceMatcher(None, text1, text2).ratio()
@staticmethod
async def generate_new_instruction(original_instruction, insights, ext_info):
prompt = CHANGE_INSTRUCTION.format(instruction=original_instruction, insights=insights)
llm = LLM()
context = llm.format_msg([Message(content=prompt, role="user")])
llm_response = await llm.aask(context, system_msgs=[REFLECTION_SYSTEM_MSG])
rsp = clean_json_from_rsp(llm_response)
new_instruction = json.loads(rsp)["New Instruction"]
return new_instruction

View file

@ -0,0 +1,183 @@
import json
from expo.utils import clean_json_from_rsp, load_data_config
from metagpt.llm import LLM
DATA_CONFIG = load_data_config()
DATASET_DESCRIPTION_SELA_PROMPT = """
# Dataset Description
{dataset}
# Dataset Metadata
{metadata}
# Dataset Head
{head}
"""
DATASET_DESCRIPTION_CUSTOM_PROMPT = """
# Dataset Description
{dataset_description}
"""
DATASET_INSIGHT_PROMPT = """
{description}
# Instruction
Propose insights to help improve the performance of the model on this dataset.
The insights should be proposed based on the dataset description with different task types.
Each task type should have at least 5 insights.
Make sure each method is diverse enough and can be implemented separately.
Be specific about models' choices, ensemble and tuning techniques, and preprocessing & feature engineering techniques.
Your model choices should be advanced enough to be helpful.
# Format
```json
[
{{
"task_type": "EDA",
"insights": [
"insight1",
"insight2",
"insight3",
...
"insightN"
]
}},
{{
"task_type": "Data Preprocessing",
"insights": [
"insight1",
"insight2",
"insight3",
...
"insightN"
]
}},
{{
"task_type": "Feature Engineering",
"insights": [
"insight1",
"insight2",
"insight3",
...
"insightN"
]
}},
{{
"task_type": "Model Training",
"insights": [
"insight1",
"insight2",
"insight3",
...
"insightN"
]
}}
]
```
"""
INSIGHT_PROPOSAL_PROMPT = """
You are an AI assistant tasked with analyzing a machine learning solution and proposing new insights to improve its performance. Given the current solution code and development score, suggest innovative approaches to enhance the model.
Current Solution Code:
{solution_code}
Development Score: {dev_score}
Based on this information, propose 3-5 new insights across different aspects of the machine learning pipeline (Data Preprocessing, Feature Engineering, and Model Training). Your insights should be specific, actionable, and have the potential to improve the model's performance.
Please format your response as a JSON array with the following structure:
[
{{
"task_type": "Data Preprocessing",
"insights": [
"insight1",
"insight2"
]
}},
{{
"task_type": "Feature Engineering",
"insights": [
"insight1",
"insight2"
]
}},
{{
"task_type": "Model Training",
"insights": [
"insight1",
"insight2"
]
}}
]
"""
KEY_DATASET_FEATURES = [
"NumberOfClasses",
"NumberOfFeatures",
"NumberOfInstances",
"NumberOfInstancesWithMissingValues",
"NumberOfMissingValues",
"NumberOfNumericFeatures",
"NumberOfSymbolicFeatures",
]
TASK_TO_ID = {"EDA": 1, "Data Preprocessing": 2, "Feature Engineering": 3, "Model Training": 4, "Model Evaluation": 5}
class SolutionDesigner:
data_dir: str = DATA_CONFIG["datasets_dir"]
async def generate_solutions(self, dataset_info, dataset_name, save_analysis_pool=True):
llm = LLM()
if type(dataset_info) == dict:
description_prompt = DATASET_DESCRIPTION_SELA_PROMPT.format(
dataset=dataset_info["description"],
metadata=self.metadata_builder(dataset_info["metadata"]),
head=dataset_info["df_head"],
)
else:
description_prompt = DATASET_DESCRIPTION_CUSTOM_PROMPT.format(dataset_description=dataset_info)
context = DATASET_INSIGHT_PROMPT.format(description=description_prompt)
rsp = await llm.aask(context)
rsp = clean_json_from_rsp(rsp)
analysis_pool = self.process_analysis_pool(json.loads(rsp))
if save_analysis_pool:
dataset_path = f"{self.data_dir}/{dataset_name}"
self.save_analysis_pool(dataset_path, analysis_pool)
return analysis_pool
async def propose_new_insights(self, solution, score):
llm = LLM()
context = INSIGHT_PROPOSAL_PROMPT.format(solution_code=solution, dev_score=score)
rsp = await llm.aask(context)
rsp = clean_json_from_rsp(rsp)
new_insights = self.process_analysis_pool(json.loads(rsp))
return new_insights
def process_analysis_pool(self, insights_rsp):
analysis_pool = []
for task_type_insights in insights_rsp:
task_type = task_type_insights["task_type"]
for insight in task_type_insights["insights"]:
analysis_pool.append({"Analysis": insight, "Category": task_type, "task_id": TASK_TO_ID[task_type]})
return analysis_pool
def metadata_builder(self, qualities):
metadata = {}
for key in KEY_DATASET_FEATURES:
metadata[key] = qualities.get(key, "N/A")
metadata_text = json.dumps(metadata, indent=4)
return metadata_text
def save_analysis_pool(self, dataset_path, analysis_pool):
fpath = f"{dataset_path}/ds_analysis_pool.json"
with open(fpath, "w") as file:
json.dump(analysis_pool, file, indent=4)

6
expo/requirements.txt Normal file
View file

@ -0,0 +1,6 @@
# expo
openml==0.14.2
# ml module to run in DI
xgboost
catboost
lightgbm

195
expo/research_assistant.py Normal file
View file

@ -0,0 +1,195 @@
from __future__ import annotations
import asyncio
import json
import os
from pydantic import model_validator
from expo.utils import mcts_logger, save_notebook
from metagpt.actions.di.write_analysis_code import WriteAnalysisCode
from metagpt.const import SERDESER_PATH
from metagpt.roles.di.data_interpreter import DataInterpreter
from metagpt.schema import Message, Task, TaskResult
from metagpt.utils.common import CodeParser, write_json_file
CODE_BLOCK_RESULT = """
## Code:
{code}
## Execution Result:
{result}
"""
EXTRACT_SCORE_PROMPT = """
# Code Blocks
{code_block}
# Instruction:
Based on the code and execution result, please extract the **final scores** and return it as a dictionary.
If you cannot find the scores, please still return a dictionary with the keys 'train_score', 'dev_score', and 'test_score', and set the values to -1.
# Format:
```json
{{
"train_score": x.x,
"dev_score": x.x,
"test_score": x.x
}}
```
"""
class TimeoutException(Exception):
pass
def async_timeout():
def decorator(func):
async def wrapper(self, *args, **kwargs):
try:
result = await asyncio.wait_for(func(self, *args, **kwargs), timeout=self.role_timeout)
except asyncio.TimeoutError:
text = f"Function timed out after {self.role_timeout} seconds"
mcts_logger.error(text)
self.save_state()
raise TimeoutException(text)
return result
return wrapper
return decorator
class ResearchAssistant(DataInterpreter):
node_id: str = "0"
start_task_id: int = 1
state_saved: bool = False
role_dir: str = SERDESER_PATH.joinpath("team", "environment", "roles", "Experimenter")
role_timeout: int = 1000
def get_node_name(self):
return f"Node-{self.node_id}"
def get_next_instruction(self):
return self.planner.plan.tasks[self.start_task_id]
def change_next_instruction(self, new_instruction):
if new_instruction is not None:
self.planner.plan.task_map[str(self.start_task_id)].instruction = new_instruction
self.remap_tasks()
def update_til_start_task(self, role: ResearchAssistant, backward: bool = True):
if backward:
# make sure the previous task instructions are matched
assert (
self.start_task_id == role.start_task_id - 1
), f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}"
for i in range(self.start_task_id):
if (
self.planner.plan.task_map[str(self.start_task_id)].instruction
!= role.planner.plan.task_map[str(self.start_task_id)].instruction
):
mcts_logger.info("Previous task instructions not matched")
self.remap_tasks()
return
# copy new role's task (self.start_task_id) to current role
self.planner.plan.task_map[str(self.start_task_id)] = role.planner.plan.task_map[
str(self.start_task_id)
].model_copy()
self.remap_tasks()
else:
assert (
self.start_task_id == role.start_task_id + 1
), f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}"
if int(role.planner.plan.current_task_id) > self.start_task_id:
for i in range(role.start_task_id):
self.planner.plan.task_map[str(i)] = role.planner.plan.task_map[str(i)].model_copy()
self.remap_tasks()
async def get_score(self):
score_dict = await self.llm_extract_score()
score_dict["score"] = score_dict["dev_score"]
return score_dict
async def llm_extract_score(self):
# result_text = self.planner.plan.task_map[str(len(self.planner.plan.task_map))].result
# code_text = self.planner.plan.task_map[str(len(self.planner.plan.task_map))].code
num_tasks = len(self.planner.plan.task_map)
task_map = self.planner.plan.task_map
code_block = "\n".join(
[
CODE_BLOCK_RESULT.format(code=task_map[str(i + 1)].code, result=task_map[str(i + 1)].result)
for i in range(num_tasks)
]
)
rsp = await self.llm.aask(EXTRACT_SCORE_PROMPT.format(code_block=code_block, role="user"))
json_block = CodeParser.parse_code(block=None, text=rsp)
score_dict = json.loads(json_block)
return score_dict
@model_validator(mode="after")
def set_plan_and_tool(self) -> "Interpreter":
if self.planner.plan.goal != "":
self.set_actions([WriteAnalysisCode])
self._set_state(0)
print("Plan already exists, skipping initialization.")
return self
print("Initializing plan and tool...")
return super().set_plan_and_tool()
async def _act_on_task(self, current_task: Task) -> TaskResult:
"""Useful in 'plan_and_act' mode. Wrap the output in a TaskResult for review and confirmation."""
mcts_logger.info(f"The current_task is: {current_task}")
code, result, is_success = await self._write_and_exec_code()
task_result = TaskResult(code=code, result=result, is_success=is_success)
if int(current_task.task_id) == self.start_task_id + 1:
# fe_id = current_task.dependent_task_ids
self.save_state()
save_notebook(role=self, save_dir=self.role_dir, name=self.get_node_name(), save_to_depth=True)
else:
save_notebook(role=self, save_dir=self.role_dir, name=self.get_node_name())
return task_result
def get_solution(self):
codes = [task.code for task in self.planner.plan.tasks]
results = [task.result for task in self.planner.plan.tasks]
return {"codes": codes, "results": results}
def save_state(self, static_save=False):
"""
attribute:
state_saved - the state has been saved
input:
static_save - saving the state without changing the state_saved flag - used when a new role is created
"""
if self.state_saved and not static_save:
return
if not static_save:
self.state_saved = True
mcts_logger.log("MCTS", f"Saving state at task {self.start_task_id}")
else:
mcts_logger.log("MCTS", "Static Saving")
stg_path = self.role_dir
name = self.get_node_name()
role_path = os.path.join(stg_path, f"{name}.json")
# save state as json file
write_json_file(role_path, self.model_dump())
def remap_tasks(self):
self.planner.plan.tasks = [
self.planner.plan.task_map[task_id] for task_id in sorted(self.planner.plan.task_map.keys())
]
@async_timeout()
async def run(self, with_message=None) -> Message | None:
"""Observe, and think and act based on the results of the observation"""
if with_message == "continue":
mcts_logger.info("Continue to run")
self.rc.working_memory.clear()
self.working_memory.clear()
rsp = await self.react()
self.set_todo(None)
self.publish_message(rsp)
return rsp
return await super().run(with_message)

0
expo/results/PLACEHOLDER Normal file
View file

0
expo/results/tree/TREE Normal file
View file

99
expo/run_experiment.py Normal file
View file

@ -0,0 +1,99 @@
import argparse
import asyncio
from expo.data.custom_task import get_mle_is_lower_better, get_mle_task_id
from expo.experimenter.aug import AugExperimenter
from expo.experimenter.autogluon import GluonExperimenter
from expo.experimenter.autosklearn import AutoSklearnExperimenter
from expo.experimenter.custom import CustomExperimenter
from expo.experimenter.experimenter import Experimenter
from expo.experimenter.mcts import MCTSExperimenter
def get_args(cmd=True):
parser = argparse.ArgumentParser()
parser.add_argument("--name", type=str, default="")
parser.add_argument(
"--exp_mode",
type=str,
default="mcts",
choices=["mcts", "aug", "base", "custom", "greedy", "autogluon", "random", "autosklearn"],
)
parser.add_argument("--role_timeout", type=int, default=1000)
get_di_args(parser)
get_mcts_args(parser)
get_aug_exp_args(parser)
if cmd:
args = parser.parse_args()
else:
args = parser.parse_args("")
if args.custom_dataset_dir:
args.external_eval = False
args.eval_func = "mlebench"
args.from_scratch = True
args.task = get_mle_task_id(args.custom_dataset_dir)
args.low_is_better = get_mle_is_lower_better(args.task)
return args
def get_mcts_args(parser):
parser.add_argument("--load_tree", dest="load_tree", action="store_true")
parser.add_argument("--no_load_tree", dest="load_tree", action="store_false")
parser.set_defaults(load_tree=False)
parser.add_argument("--rollouts", type=int, default=5)
parser.add_argument("--use_fixed_insights", dest="use_fixed_insights", action="store_true")
parser.set_defaults(use_fixed_insights=False)
parser.add_argument("--start_task_id", type=int, default=2)
parser.add_argument(
"--from_scratch", dest="from_scratch", action="store_true", help="Generate solutions from scratch"
)
parser.set_defaults(from_scratch=False)
parser.add_argument("--no_external_eval", dest="external_eval", action="store_false")
parser.set_defaults(external_eval=True)
parser.add_argument("--eval_func", type=str, default="sela", choices=["sela", "mlebench"])
parser.add_argument("--custom_dataset_dir", type=str, default=None)
parser.add_argument("--max_depth", type=int, default=4)
def get_aug_exp_args(parser):
parser.add_argument("--aug_mode", type=str, default="single", choices=["single", "set"])
parser.add_argument("--is_multimodal", action="store_true", help="Specify if the model is multi-modal")
def get_di_args(parser):
parser.add_argument("--task", type=str, default="titanic")
parser.add_argument("--low_is_better", dest="low_is_better", action="store_true")
parser.set_defaults(low_is_better=False)
parser.add_argument("--reflection", dest="reflection", action="store_true")
parser.add_argument("--no_reflection", dest="reflection", action="store_false")
parser.add_argument("--num_experiments", type=int, default=1)
parser.add_argument("--special_instruction", type=str, default=None, choices=["ag", "stacking", "text", "image"])
parser.set_defaults(reflection=True)
async def main(args):
if args.exp_mode == "mcts":
experimenter = MCTSExperimenter(args)
elif args.exp_mode == "greedy":
experimenter = MCTSExperimenter(args, tree_mode="greedy")
elif args.exp_mode == "random":
experimenter = MCTSExperimenter(args, tree_mode="random")
elif args.exp_mode == "aug":
experimenter = AugExperimenter(args)
elif args.exp_mode == "base":
experimenter = Experimenter(args)
elif args.exp_mode == "autogluon":
experimenter = GluonExperimenter(args)
elif args.exp_mode == "custom":
experimenter = CustomExperimenter(args)
elif args.exp_mode == "autosklearn":
experimenter = AutoSklearnExperimenter(args)
else:
raise ValueError(f"Invalid exp_mode: {args.exp_mode}")
await experimenter.run_experiment()
if __name__ == "__main__":
args = get_args()
asyncio.run(main(args))

15
expo/scripts/run_cls.sh Normal file
View file

@ -0,0 +1,15 @@
#!/bin/bash
tasks=("smoker-status" "software-defects" "jasmine" "credit-g" "Click_prediction_small" "kick" "kc1" "titanic" "icr" "wine-quality-white" "mfeat-factors" "segment" "GesturePhaseSegmentationProcessed")
for i in {1..3}
do
for task in "${tasks[@]}"; do
echo "Running experiment for task: $task"
python run_experiment.py --exp_mode mcts --task "$task" --rollouts 10 --special_instruction stacking
echo "Experiment for task $task completed."
done
done
echo "All experiments completed."

View file

@ -0,0 +1,13 @@
#!/bin/bash
tasks=("banking77" "gnad10" "sms_spam" "oxford-iiit-pet" "stanford_cars" "fashion_mnist" )
for i in {1..3}
do
for task in "${tasks[@]}"; do
echo "Running experiment for task: $task"
python run_experiment.py --exp_mode mcts --task "$task" --rollouts 10
echo "Experiment for task $task completed."
done
done
echo "All experiments completed."

14
expo/scripts/run_reg.sh Normal file
View file

@ -0,0 +1,14 @@
#!/bin/bash
tasks=("concrete-strength" "Moneyball" "colleges" "SAT11-HAND-runtime-regression" "diamonds" "boston" "house-prices")
for i in {1..3}
do
for task in "${tasks[@]}"; do
echo "Running experiment for task: $task"
python run_experiment.py --exp_mode mcts --task "$task" --rollouts 10 --low_is_better --special_instruction stacking
echo "Experiment for task $task completed."
done
done
echo "All experiments completed."

View file

@ -0,0 +1,23 @@
import networkx as nx
from expo.evaluation.visualize_mcts import build_tree_recursive, visualize_tree
from expo.MCTS import MCTS, create_initial_state, initialize_di_root_node
from expo.run_experiment import get_args
from expo.utils import DATA_CONFIG
if __name__ == "__main__":
args = get_args()
data_config = DATA_CONFIG
state = create_initial_state(args.task, 0, data_config, args=args)
role, node = initialize_di_root_node(state)
mcts = MCTS(
root_node=node,
max_depth=5,
use_fixed_insights=False,
)
mcts.load_tree()
root = mcts.root_node
G = nx.DiGraph()
build_tree_recursive(G, "0", root)
visualize_tree(G, save_path="results/tree.png")

130
expo/utils.py Normal file
View file

@ -0,0 +1,130 @@
import os
import re
from datetime import datetime
from pathlib import Path
import nbformat
import yaml
from loguru import logger as _logger
from nbclient import NotebookClient
from nbformat.notebooknode import NotebookNode
from metagpt.roles.role import Role
def load_data_config(file_path="data.yaml"):
with open(file_path, "r") as stream:
data_config = yaml.safe_load(stream)
return data_config
DATASET_CONFIG = load_data_config("datasets.yaml")
DATA_CONFIG = load_data_config()
DATA_CONFIG["datasets"] = DATASET_CONFIG["datasets"]
def get_mcts_logger():
logfile_level = "DEBUG"
name: str = None
current_date = datetime.now()
formatted_date = current_date.strftime("%Y%m%d")
log_name = f"{name}_{formatted_date}" if name else formatted_date # name a log with prefix name
# _logger.remove()
_logger.level("MCTS", color="<green>", no=25)
# _logger.add(sys.stderr, level=print_level)
_logger.add(Path(DATA_CONFIG["work_dir"]) / DATA_CONFIG["role_dir"] / f"{log_name}.txt", level=logfile_level)
_logger.propagate = False
return _logger
mcts_logger = get_mcts_logger()
def get_exp_pool_path(task_name, data_config, pool_name="analysis_pool"):
datasets_dir = data_config["datasets_dir"]
if task_name in data_config["datasets"]:
dataset = data_config["datasets"][task_name]
data_path = os.path.join(datasets_dir, dataset["dataset"])
else:
raise ValueError(
f"Dataset {task_name} not found in config file. Available datasets: {data_config['datasets'].keys()}"
)
exp_pool_path = os.path.join(data_path, f"{pool_name}.json")
if not os.path.exists(exp_pool_path):
return None
return exp_pool_path
def change_plan(role, plan):
print(f"Change next plan to: {plan}")
tasks = role.planner.plan.tasks
finished = True
for i, task in enumerate(tasks):
if not task.code:
finished = False
break
if not finished:
tasks[i].plan = plan
return finished
def is_cell_to_delete(cell: NotebookNode) -> bool:
if "outputs" in cell:
for output in cell["outputs"]:
if output and "traceback" in output:
return True
return False
def process_cells(nb: NotebookNode) -> NotebookNode:
new_cells = []
i = 1
for cell in nb["cells"]:
if cell["cell_type"] == "code" and not is_cell_to_delete(cell):
cell["execution_count"] = i
new_cells.append(cell)
i = i + 1
nb["cells"] = new_cells
return nb
def save_notebook(role: Role, save_dir: str = "", name: str = "", save_to_depth=False):
save_dir = Path(save_dir)
tasks = role.planner.plan.tasks
nb = process_cells(role.execute_code.nb)
os.makedirs(save_dir, exist_ok=True)
file_path = save_dir / f"{name}.ipynb"
nbformat.write(nb, file_path)
if save_to_depth:
clean_file_path = save_dir / f"{name}_clean.ipynb"
codes = [task.code for task in tasks if task.code]
clean_nb = nbformat.v4.new_notebook()
for code in codes:
clean_nb.cells.append(nbformat.v4.new_code_cell(code))
nbformat.write(clean_nb, clean_file_path)
async def load_execute_notebook(role):
tasks = role.planner.plan.tasks
codes = [task.code for task in tasks if task.code]
executor = role.execute_code
executor.nb = nbformat.v4.new_notebook()
executor.nb_client = NotebookClient(executor.nb, timeout=role.role_timeout)
# await executor.build()
for code in codes:
outputs, success = await executor.run(code)
print(f"Execution success: {success}, Output: {outputs}")
print("Finish executing the loaded notebook")
return executor
def clean_json_from_rsp(text):
pattern = r"```json(.*?)```"
matches = re.findall(pattern, text, re.DOTALL)
if matches:
json_str = "\n".join(matches)
return json_str
else:
return ""

View file

@ -6,8 +6,6 @@
"""
from __future__ import annotations
import json
from metagpt.actions import Action
from metagpt.prompts.di.write_analysis_code import (
CHECK_DATA_PROMPT,
@ -30,9 +28,10 @@ class WriteAnalysisCode(Action):
)
rsp = await self._aask(reflection_prompt, system_msgs=[REFLECTION_SYSTEM_MSG])
reflection = json.loads(CodeParser.parse_code(block=None, text=rsp))
return reflection["improved_impl"]
# reflection = json.loads(CodeParser.parse_code(block=None, text=rsp))
# return reflection["improved_impl"]
reflection = CodeParser.parse_code(block=None, text=rsp)
return reflection
async def run(
self,

View file

@ -40,15 +40,17 @@ Tests failed:
assert add(1, 2) == 3 # output: -1
assert add(1, 3) == 4 # output: -2
[reflection on previous impl]:
[reflection on previous impl]
The implementation failed the test cases where the input integers are 1 and 2. The issue arises because the code does not add the two integers together, but instead subtracts the second integer from the first. To fix this issue, we should change the operator from `-` to `+` in the return statement. This will ensure that the function returns the correct output for the given input.
[improved impl]:
[improved impl]
```python
def add(a: int, b: int) -> int:
"""
Given integers a and b, return the total value of a and b.
"""
return a + b
```
'''
REFLECTION_PROMPT = """
@ -60,17 +62,17 @@ Here is an example of debugging with reflection.
[context]
{context}
[previous impl]:
[previous impl]
{previous_impl}
[instruction]
Analyze your previous code and error in [context] step by step, provide me with improved method and code. Remember to follow [context] requirement. Don't forget to write code for steps behind the error step.
Output a json following the format:
```json
{{
"reflection": str = "Reflection on previous implementation",
"improved_impl": str = "Refined code after reflection.",
}}
Output in the following format:
[reflection on previous impl]
...
[improved impl]:
```python
# your code
```
"""

View file

@ -11,7 +11,7 @@ The current task is about data preprocessing, please note the following:
- Monitor data types per column, applying appropriate methods.
- Ensure operations are on existing dataset columns.
- Avoid writing processed data to files.
- Avoid any change to label column, such as standardization, etc.
- **ATTENTION** Do NOT make any changes to the label column, such as standardization, etc.
- Prefer alternatives to one-hot encoding for categorical data.
- Only encode or scale necessary columns to allow for potential feature-specific engineering tasks (like time_extract, binning, extraction, etc.) later.
- Each step do data preprocessing to train, must do same for test separately at the same time.
@ -25,8 +25,8 @@ The current task is about feature engineering. when performing it, please adhere
- Use available feature engineering tools if they are potential impactful.
- Avoid creating redundant or excessively numerous features in one step.
- Exclude ID columns from feature generation and remove them.
- Each feature engineering operation performed on the train set must also applies to the test separately at the same time.
- Avoid using the label column to create features, except for cat encoding.
- Each feature engineering operation performed on the train set must also applies to the dev/test separately at the same time.
- **ATTENTION** Do NOT use the label column to create features, except for cat encoding.
- Use the data from previous task result if exist, do not mock or reload data yourself.
- Always copy the DataFrame before processing it and use the copy to process.
"""
@ -34,6 +34,10 @@ The current task is about feature engineering. when performing it, please adhere
# Prompt for taking on "model_train" tasks
MODEL_TRAIN_PROMPT = """
The current task is about training a model, please ensure high performance:
- For tabular datasets - you have access to XGBoost, CatBoost, random forest, extremely randomized trees, k-nearest neighbors, linear regression, etc.
- For image datasets - you have access to Swin Transformer, ViT, ResNet, EfficientNet, etc.
- For text datasets - you have access to Electra, DeBERTa, GPT-2, BERT, etc.
- Avoid the use of SVM because of its high training time.
- Keep in mind that your user prioritizes results and is highly focused on model performance. So, when needed, feel free to use models of any complexity to improve effectiveness, such as XGBoost, CatBoost, etc.
- If non-numeric columns exist, perform label encode together with all steps.
- Use the data from previous task result directly, do not mock or reload data yourself.

View file

@ -478,10 +478,10 @@ class Role(SerializationMixin, ContextMixin, BaseModel):
async def _plan_and_act(self) -> Message:
"""first plan, then execute an action sequence, i.e. _think (of a plan) -> _act -> _act -> ... Use llm to come up with the plan dynamically."""
# create initial plan and update it until confirmation
goal = self.rc.memory.get()[-1].content # retreive latest user requirement
await self.planner.update_plan(goal=goal)
if not self.planner.plan.goal:
# create initial plan and update it until confirmation
goal = self.rc.memory.get()[-1].content # retreive latest user requirement
await self.planner.update_plan(goal=goal)
# take on tasks until all finished
while self.planner.current_task: