Merge branch 'experimenter' into 'expo'

add experimenter

See merge request agents/exp_optimizer!1
This commit is contained in:
林义章 2024-09-03 05:55:19 +00:00
commit 1a833dc3a9
26 changed files with 2109 additions and 6 deletions

1
.gitignore vendored
View file

@ -188,3 +188,4 @@ cov.xml
*-structure.json
*.dot
.python-version
expo/results/*

355
expo/MCTS.py Normal file
View file

@ -0,0 +1,355 @@
import random
import math
import os
import pandas as pd
from expo.research_assistant import ResearchAssistant
from expo.insights.instruction_generator import InstructionGenerator
from expo.dataset import get_split_dataset_path, generate_task_requirement
from expo.evaluation.evaluation import evaluate_score
from expo.utils import mcts_logger, load_execute_notebook, get_exp_pool_path
from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender
from metagpt.utils.common import write_json_file, read_json_file, format_trackback_info
import numpy as np
import pickle
def initialize_di_root_node(task, data_config, low_is_better=False, reflection=True, name=""):
start_task_id = 2
state = create_initial_state(task, start_task_id=start_task_id, data_config=data_config, low_is_better=low_is_better, name=name)
role = ResearchAssistant(node_id="0", start_task_id=start_task_id, use_reflection=reflection, role_dir=state["node_dir"])
return role, Node(parent=None, state=state, action=None, value=0)
def create_initial_state(task, start_task_id, data_config, low_is_better, name):
initial_state = {
"task": task,
"work_dir": data_config["work_dir"],
"node_dir": os.path.join(data_config["work_dir"], data_config["role_dir"], f"{task}{name}"),
"dataset_config": data_config["datasets"][task],
"datasets_dir": get_split_dataset_path(task, data_config),
"exp_pool_path": get_exp_pool_path(task, data_config, pool_name="ds_analysis_pool"),
"requirement": generate_task_requirement(task, data_config),
"has_run": False,
"start_task_id": start_task_id,
"low_is_better": low_is_better,
}
return initial_state
class Node():
state : dict = {}
action : str = None
value : float = 0
visited : int = 0
children : list = []
normalized_reward : dict = {"train_score": 0, "dev_score": 0, "test_score": 0}
parent = None
def __init__(self, parent=None, state = None, action=None, value = 0, max_depth=4, **kwargs):
self.state = state
self.action = action
self.value = value
self.raw_value = 0
self.raw_reward = dict()
self.parent = parent
self.children = []
self.max_depth = max_depth
self.depth = self.generate_depth()
self.id = self.generate_id()
if self.parent is not None:
self.save_node()
def avg_value(self):
if self.visited == 0:
return 0
return self.value / self.visited
def __hash__(self):
return hash(self.id)
def save_node(self):
os.makedirs(self.state["node_dir"], exist_ok=True)
with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), 'wb') as f:
pickle.dump(self, f)
def load_node(self):
with open(os.path.join(self.state["node_dir"], f"Node-{self.id}.pkl"), 'rb') as f:
return pickle.load(f)
def get_depth(self):
return self.depth
def generate_depth(self):
if self.parent is None:
return 0
else:
return self.parent.depth + 1
def generate_id(self):
if self.parent is None:
return "0"
else:
num_sibling = len(self.parent.children)
return f"{self.parent.id}-{num_sibling}"
def is_terminal(self):
return int(self.state["start_task_id"]) == self.max_depth + 1
def is_fully_expanded(self):
return len(self.children) > 0
def add_child(self, child_node):
self.children.append(child_node)
def update(self, reward:dict, child_node=None):
if child_node is not None:
child_role = child_node.load_role()
role = self.load_role()
role.update_til_start_task(child_role)
role.save_state()
else:
self.raw_value = reward["test_score"]
self.value += reward["score"]
self.visited += 1
self.save_node()
def get_role_path(self):
fname = f"Node-{self.id}.json"
role_path = os.path.join(self.state["node_dir"], fname)
return role_path
def load_role(self):
role_dict = read_json_file(self.get_role_path())
if role_dict.get('tool_recommender') is None:
role_dict['tool_recommender'] = ToolRecommender()
elif isinstance(role_dict.get('tool_recommender', {}).get('tools'), dict):
role_dict['tool_recommender']['tools'] = list(role_dict['tool_recommender']['tools'].keys())
role = ResearchAssistant(**role_dict)
if self.parent is not None: # TODO: Check this
parent_role = self.parent.load_role()
role.update_til_start_task(parent_role, backward=False)
role.remap_tasks()
return role
def save_new_role(self, role: ResearchAssistant):
role.node_id = self.id
role.start_task_id = self.state['start_task_id']
role.state_saved = False
role.change_next_instruction(self.action)
mcts_logger.log("MCTS", f"Saving new role: {role.node_id}")
role.save_state(static_save=True)
async def expand(self, max_children):
if self.is_fully_expanded():
return
insight_geneartor = InstructionGenerator()
role = self.load_role()
original_instruction = role.get_next_instruction()
insights = await insight_geneartor.generate_new_instructions(task_id=role.start_task_id + 1,
original_instruction=original_instruction,
max_num=max_children,
file_path=self.state["exp_pool_path"])
new_state = self.state.copy()
new_state['start_task_id'] += 1
for insight in insights:
new_role = role.model_copy()
node = Node(parent=self, state=new_state, action=insight, value=0)
node.save_new_role(new_role)
self.add_child(node)
# def evaluate_test(self):
# prediction_fpath = os.path.join(self.state["work_dir"], self.state["task"], "predictions.csv")
# predictions = pd.read_csv(prediction_fpath)["target"]
# # copy predictions.csv to the node_dir
# predictions_node_fpath = os.path.join(self.state["node_dir"], "Node-{self.id}-predictions.csv")
# predictions.to_csv(predictions_node_fpath, index=False)
# # load test_target.csv
# split_datasets_dir = self.state["datasets_dir"]
# gt = pd.read_csv(os.path.join(split_datasets_dir["test_target"]))["target"]
# metric = self.state["dataset_config"]["metric"]
# return evaluate_score(predictions, gt, metric)
def evaluate_prediction(self, split):
pred_path = os.path.join(self.state["work_dir"], self.state["task"], f"{split}_predictions.csv")
pred_node_path = os.path.join(self.state["node_dir"], f"Node-{self.id}-{split}_predictions.csv")
gt_path = os.path.join(self.state["datasets_dir"][f"{split}_target"])
preds = pd.read_csv(pred_path)["target"]
preds.to_csv(pred_node_path, index=False)
gt = pd.read_csv(gt_path)["target"]
metric = self.state["dataset_config"]["metric"]
return evaluate_score(preds, gt, metric)
def evaluate_simulation(self, score_dict):
scores = {
"dev_score": self.evaluate_prediction("dev"),
"test_score": self.evaluate_prediction("test")
}
score_dict.update(scores)
return score_dict
async def run_node(self, role=None):
if self.is_terminal() and role is not None:
if role.state_saved:
return self.raw_reward
if not role:
role = self.load_role()
await load_execute_notebook(role) # execute previous notebook's code
await role.run(with_message='continue')
else:
await role.run(with_message=self.state['requirement'])
score_dict = await role.get_score()
score_dict = self.evaluate_simulation(score_dict)
self.raw_reward = score_dict
if self.state["low_is_better"]:
# normalized the score to be between 0 and 1, and higher is better
def normalize_score(score):
if score == -1:
return 0
return 1 / (1 + score)
score_dict = {k: normalize_score(v) for k, v in score_dict.items()}
self.normalized_reward = score_dict
return score_dict
class MCTS():
#data_path
root_node : Node = None
children : dict = {}
max_depth : int = 5
c_explore : float = 1.4
c_unvisited : float = 0.8
def __init__(self, root_node, max_depth):
self.root_node = root_node
self.max_depth = max_depth
def select(self, node: Node):
node = self.best_child()
mcts_logger.log("MCTS", f"Selected node id: {node.id}")
return node
def best_child(self):
def uct(node: Node):
n_visits = node.visited if node.visited else self.c_unvisited
avg_value = node.avg_value() if node.visited else node.value/self.c_unvisited
return avg_value + self.c_explore * math.sqrt(math.log(node.parent.visited) / n_visits)
if len(self.children) == 0:
return self.root_node
all_children = [child for children in self.children.values() for child in children]
return max(all_children, key=uct)
async def expand(self, node : Node, max_children=4):
await node.expand(max_children)
if node not in self.children or not self.children[node]:
self.children[node] = node.children
return node.children
async def simulate(self, node : Node, role=None):
"Returns the reward for a random simulation (to completion) of `node`"
mcts_logger.log("MCTS", f"Start simulating node {node.id}:")
while node.children:
node = random.choice(node.children)
reward = await node.run_node(role)
mcts_logger.log("MCTS", f"Simulated node's reward: {reward}")
return reward
def backpropagate(self, node : Node, reward):
child_node = node
node.update(reward)
node = node.parent
while node is not None:
node.update(reward, child_node)
node, child_node = node.parent, node
def best_path(self, root : Node):
best_child = root
best_score = 0
def bfs(node : Node, best_score, best_child : Node, split):
assert split in ["test_score", "dev_score"]
if node not in self.children:
return best_score, best_child
for child in self.children[node]:
score = child.normalized_reward[split]
print(child.id, score)
if score > best_score:
best_score = score
best_child = child
best_score, best_child = bfs(child, best_score, best_child, split)
return best_score, best_child
_, best_child = bfs(root, best_score, best_child, "test_score")
_, dev_best_child = bfs(root, best_score, best_child, "dev_score")
return {"dev_best": dev_best_child,
"global_best": best_child}
def get_num_simulations(self):
return self.root_node.visited
async def search(self, task, data_config, name,
rollouts, load_tree=False, low_is_better=False, reflection=False):
role, root = initialize_di_root_node(task, data_config, low_is_better=low_is_better, reflection=reflection, name=name)
self.root_node = root
tree_loaded = False
if load_tree:
tree_loaded = self.load_tree()
mcts_logger.log("MCTS", f"Number of simulations: {self.get_num_simulations()}")
mcts_logger.log("MCTS", f"Tree loaded: {tree_loaded}")
if not tree_loaded:
rollouts -= 2 # 2 rollouts for the initial tree
if rollouts < 0:
raise ValueError("Rollouts must be greater than 2 if there is no tree to load")
self.children[root] = []
reward = await self.simulate(root, role)
self.backpropagate(root, reward)
children = await self.expand(root)
#目前是随机选择1个后续可以改成多个
first_leaf = random.choice(children)
reward = await self.simulate(first_leaf)
self.backpropagate(first_leaf, reward)
else:
root = self.root_node
# 后续迭代使用UCT进行选择expand并模拟和反向传播
for _ in range(rollouts): # number of rollouts
mcts_logger.log("MCTS", f"Start the next rollout {_+1}")
node = self.select(root)
if node.is_terminal():
if node.raw_value == 0:
reward = await self.simulate(node)
else:
reward = {"test_score": node.raw_value, "score": node.value}
mcts_logger.log("MCTS", f"Terminal node's reward: {reward}")
self.backpropagate(node, reward)
else:
if node.visited > 0:
children = await self.expand(node)
node = random.choice(children)
reward = await self.simulate(node)
self.backpropagate(node, reward)
return self.best_path(root)
def load_tree(self):
def load_children_node(node):
mcts_logger.log("MCTS", f"Load node {node.id}'s child: {node.children}")
if node.is_terminal() or not node.children:
return
for child in node.children:
child.load_node()
self.children[child] = child.children
load_children_node(child)
# Load all pkl files in the node_dir
all_pkl_files = os.listdir(self.root_node.state["node_dir"])
all_pkl_files = [f for f in all_pkl_files if f.endswith(".pkl")]
if os.path.exists(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl")):
with open(os.path.join(self.root_node.state["node_dir"], "Node-0.pkl"), 'rb') as f:
self.root_node = pickle.load(f)
self.children[self.root_node] = self.root_node.children
load_children_node(self.root_node)
if self.children:
return True
return False

114
expo/README.md Normal file
View file

@ -0,0 +1,114 @@
# Expo
## 1. Data Preparation
- 下载数据集https://deepwisdom.feishu.cn/drive/folder/RVyofv9cvlvtxKdddt2cyn3BnTc?from=from_copylink
- 修改`data.yaml``datasets_dir`为数据集合集根目录存储位置
## 2. Configs
### Data Config
`datasets.yaml` 提供数据集对应的指标和基础提示词
`data.yaml` 继承了`datasets.yaml`以及一些路径信息,需要将`datasets_dir`指到数据集合集的根目录下
### LLM Config
```
llm:
api_type: 'openai'
model: deepseek-coder
base_url: "https://oneapi.deepwisdom.ai/v1"
api_key: sk-xxx
temperature: 0.5
```
### Budget
实验轮次 k = 10, 20
### 提示词使用
通过执行`dataset.py`中的`generate_task_requirement`函数获取提示词
## 3. Evaluation
运行各个框架运行后框架需要提供Dev和Test的`dev_predictions.csv``test_predictions.csv` column name为target
两种评估方式
1. `evaluation.py` 提供pred和原始的gt1D iterable以及需要使用的metric返回evaluation score
2. 使用`CustomExperimenter`
```
experimenter = CustomExperimenter(task="titanic")
score_dict = experimenter.evaluate_pred_files(dev_pred_path, test_pred_path)
```
## 4. Baselines
### DS Agent
提供github链接并说明使用的命令以及参数设置
### AIDE
提供github链接并说明使用的命令以及参数设置
### Autogluon
提供github链接并说明使用的命令以及参数设置
### Base DI
For setup, check 5.
- `python run_experiment.py --exp_mode base --task titanic`
### DI RandomSearch
For setup, check 5.
- Single insight
`python run_experiment.py --exp_mode aug --task titanic --aug_mode single`
- Set insight
`python run_experiment.py --exp_mode aug --task titanic --aug_mode set`
## 5. DI MCTS
### Run DI MCTS
#### Setup
In the root directory,
```
pip install -e .
cd expo
pip install -r requirements.txt
```
#### Run
- `python run_experiment.py --exp_mode mcts --task titanic --rollout 5`
If the dataset has reg metric, remember to use `--low_is_better`:
- `python run_experiment.py --exp_mode mcts --task househouse_prices --rollout 5 --low_is_better`

160
expo/data.yaml Normal file
View file

@ -0,0 +1,160 @@
datasets_dir: "D:/work/automl/datasets" # path to the datasets directory
datasets:
titanic:
dataset: 04_titanic
metric: f1
target_col: Survived
user_requirement: "This is a 04_titanic dataset. Your goal is to predict the target\
\ column `Survived`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\
\ or make any visualizations.\n"
house-prices:
dataset: 05_house-prices-advanced-regression-techniques
metric: rmse
target_col: SalePrice
user_requirement: "This is a 05_house-prices-advanced-regression-techniques dataset.\
\ Your goal is to predict the target column `SalePrice`.\nPerform data analysis,\
\ data preprocessing, feature engineering, and modeling to predict the target.\
\ \nReport rmse on the eval data. Do not plot or make any visualizations.\n"
santander-customer:
dataset: 06_santander-customer-transaction-prediction
metric: f1
target_col: target
user_requirement: "This is a 06_santander-customer-transaction-prediction dataset.\
\ Your goal is to predict the target column `target`.\nPerform data analysis,\
\ data preprocessing, feature engineering, and modeling to predict the target.\
\ \nReport f1 on the eval data. Do not plot or make any visualizations.\n"
icr:
dataset: 07_icr-identify-age-related-conditions
metric: f1
target_col: Class
user_requirement: "This is a 07_icr-identify-age-related-conditions dataset. Your\
\ goal is to predict the target column `Class`.\nPerform data analysis, data\
\ preprocessing, feature engineering, and modeling to predict the target. \n\
Report f1 on the eval data. Do not plot or make any visualizations.\n"
Click_prediction_small:
dataset: Click_prediction_small
metric: f1
target_col: click
user_requirement: "This is a Click_prediction_small dataset. Your goal is to predict\
\ the target column `click`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 on the eval data.\
\ Do not plot or make any visualizations.\n"
GesturePhaseSegmentationProcessed:
dataset: GesturePhaseSegmentationProcessed
metric: f1 weighted
target_col: Phase
user_requirement: "This is a GesturePhaseSegmentationProcessed dataset. Your goal\
\ is to predict the target column `Phase`.\nPerform data analysis, data preprocessing,\
\ feature engineering, and modeling to predict the target. \nReport f1 weighted\
\ on the eval data. Do not plot or make any visualizations.\n"
Moneyball:
dataset: Moneyball
metric: rmse
target_col: RS
user_requirement: "This is a Moneyball dataset. Your goal is to predict the target\
\ column `RS`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport rmse on the eval data. Do not\
\ plot or make any visualizations.\n"
SAT11-HAND-runtime-regression:
dataset: SAT11-HAND-runtime-regression
metric: rmse
target_col: runtime
user_requirement: "This is a SAT11-HAND-runtime-regression dataset. Your goal\
\ is to predict the target column `runtime`.\nPerform data analysis, data preprocessing,\
\ feature engineering, and modeling to predict the target. \nReport rmse on\
\ the eval data. Do not plot or make any visualizations.\n"
boston:
dataset: boston
metric: rmse
target_col: MEDV
user_requirement: "This is a boston dataset. Your goal is to predict the target\
\ column `MEDV`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport rmse on the eval data. Do not\
\ plot or make any visualizations.\n"
colleges:
dataset: colleges
metric: rmse
target_col: percent_pell_grant
user_requirement: "This is a colleges dataset. Your goal is to predict the target\
\ column `percent_pell_grant`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport rmse on the eval\
\ data. Do not plot or make any visualizations.\n"
credit-g:
dataset: credit-g
metric: f1
target_col: class
user_requirement: "This is a credit-g dataset. Your goal is to predict the target\
\ column `class`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\
\ or make any visualizations.\n"
diamonds:
dataset: diamonds
metric: rmse
target_col: price
user_requirement: "This is a diamonds dataset. Your goal is to predict the target\
\ column `price`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport rmse on the eval data. Do not\
\ plot or make any visualizations.\n"
jasmine:
dataset: jasmine
metric: f1
target_col: class
user_requirement: "This is a jasmine dataset. Your goal is to predict the target\
\ column `class`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\
\ or make any visualizations.\n"
kc1:
dataset: kc1
metric: f1
target_col: defects
user_requirement: "This is a kc1 dataset. Your goal is to predict the target column\
\ `defects`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\
\ or make any visualizations.\n"
kick:
dataset: kick
metric: f1
target_col: IsBadBuy
user_requirement: "This is a kick dataset. Your goal is to predict the target\
\ column `IsBadBuy`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\
\ or make any visualizations.\n"
mfeat-factors:
dataset: mfeat-factors
metric: f1 weighted
target_col: class
user_requirement: "This is a mfeat-factors dataset. Your goal is to predict the\
\ target column `class`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 weighted on the\
\ eval data. Do not plot or make any visualizations.\n"
segment:
dataset: segment
metric: f1 weighted
target_col: class
user_requirement: "This is a segment dataset. Your goal is to predict the target\
\ column `class`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 weighted on the eval data.\
\ Do not plot or make any visualizations.\n"
steel-plates-fault:
dataset: steel-plates-fault
metric: f1 weighted
target_col: target
user_requirement: "This is a steel-plates-fault dataset. Your goal is to predict\
\ the target column `target`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 weighted on the\
\ eval data. Do not plot or make any visualizations.\n"
wine-quality-white:
dataset: wine-quality-white
metric: f1 weighted
target_col: Class
user_requirement: "This is a wine-quality-white dataset. Your goal is to predict\
\ the target column `Class`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 weighted on the\
\ eval data. Do not plot or make any visualizations.\n"
work_dir: ../workspace # path to the workspace directory
role_dir: storage/team/environment/roles/ResearchAssistant_David
# analysis_pool_dir: D:/work/MG-open/MetaGPT/examples/MCTS_test/analysis_pool_sample.json

313
expo/dataset.py Normal file
View file

@ -0,0 +1,313 @@
import openml
from pathlib import Path
from sklearn.model_selection import train_test_split
import os
import json
import yaml
import pandas as pd
from expo.insights.solution_designer import SolutionDesigner
import asyncio
BASE_USER_REQUIREMENT = """\
This is a {datasetname} dataset. Your goal is to predict the target column `{target_col}`.
Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target.
Report {metric} on the eval data. Do not plot or make any visualizations.
"""
TASK_PROMPT = """\
# User requirement
{user_requirement}
**Attention**
1. Please do not leak the target label in any form during training.
2. Dev and Test sets do not have the target column.
3. You should perform transformations on all sets at the same step.
4. If labels are transformed during training, they should be transformed back to the original format before saving the predictions.
## Saving Dev and Test Predictions
1. Save the prediction results of BOTH the dev set and test set in `dev_predictions.csv` and `test_predictions.csv` respectively in the output directory.
- Both files should contain a single column named `target` with the predicted values.
2. Make sure the prediction results are in the same format as the target column in the training set.
- The labels should be transformed back to the original format if any transformation was applied during training.
## Output Training Set Performance
Make sure the performance of the model is printed in python in the last step even if it has been printed in the previous steps. The value should be a float number.
Print the training set performance in the last step. Write in this format:
```python
...
print("Train score:", train_score)
```
# Data dir
training: {train_path}
dev: {dev_path}
testing: {test_path}
# Output dir
{output_dir}
"""
SEED = 100
TRAIN_TEST_SPLIT = 0.8
TRAIN_DEV_SPLIT = 0.75
OPENML_DATASET_IDS = [
# reg
41021,
42727,
41980,
42225,
531,
# cls
41143,
31,
42733,
41162,
1067,
# multi cls
40498,
40982,
12,
40984,
4538,
]
CUSTOM_DATASETS = [
("04_titanic", "Survived"),
("05_house-prices-advanced-regression-techniques", "SalePrice"),
("06_santander-customer-transaction-prediction", "target"),
("07_icr-identify-age-related-conditions", "Class")
]
def get_split_dataset_path(dataset_name, config):
datasets_dir = config['datasets_dir']
if dataset_name in config['datasets']:
dataset = config['datasets'][dataset_name]
data_path = os.path.join(datasets_dir, dataset['dataset'])
split_datasets = {
"train": os.path.join(data_path, "split_train.csv"),
"dev": os.path.join(data_path, "split_dev.csv"),
"dev_wo_target": os.path.join(data_path, "split_dev_wo_target.csv"),
"dev_target": os.path.join(data_path, "split_dev_target.csv"),
"test": os.path.join(data_path, "split_test.csv"),
"test_wo_target": os.path.join(data_path, "split_test_wo_target.csv"),
"test_target": os.path.join(data_path, "split_test_target.csv"),
}
return split_datasets
else:
raise ValueError(f"Dataset {dataset_name} not found in config file. Available datasets: {config['datasets'].keys()}")
def get_user_requirement(task_name, config):
datasets_dir = config['datasets_dir']
if task_name in config['datasets']:
dataset = config['datasets'][task_name]
data_path = os.path.join(datasets_dir, dataset['dataset'])
user_requirement = dataset['user_requirement']
return data_path, user_requirement
else:
raise ValueError(f"Dataset {task_name} not found in config file. Available datasets: {config['datasets'].keys()}")
def save_datasets_dict_to_yaml(datasets_dict):
with open("datasets.yaml", "w") as file:
yaml.dump(datasets_dict, file)
def create_dataset_dict(dataset):
dataset_dict = {
"dataset": dataset.name,
"user_requirement": dataset.create_base_requirement(),
"metric": dataset.get_metric(),
"target_col": dataset.target_col
}
return dataset_dict
def generate_task_requirement(task_name, data_config):
user_requirement = get_user_requirement(task_name, data_config)
split_dataset_path = get_split_dataset_path(task_name, data_config)
train_path = split_dataset_path["train"]
dev_path = split_dataset_path["dev_wo_target"]
test_path = split_dataset_path["test_wo_target"]
work_dir = data_config["work_dir"]
output_dir = f"{work_dir}/{task_name}"
user_requirement = TASK_PROMPT.format(user_requirement=user_requirement,
train_path=train_path, dev_path=dev_path, test_path=test_path,
output_dir=output_dir)
print(user_requirement)
return user_requirement
class ExpDataset:
description : str = None
metadata : dict = None
dataset_dir : str = None
target_col : str = None
name : str = None
def __init__(self, name, dataset_dir, **kwargs):
self.name = name
self.dataset_dir = dataset_dir
self.target_col = kwargs.get("target_col", None)
self.force_update = kwargs.get("force_update", False)
self.save_dataset(target_col=self.target_col)
def check_dataset_exists(self):
fnames = ["split_train.csv", "split_dev.csv", "split_test.csv",
"split_dev_wo_target.csv", "split_dev_target.csv",
"split_test_wo_target.csv", "split_test_target.csv"]
for fname in fnames:
if not os.path.exists(Path(self.dataset_dir, self.name, fname)):
return False
return True
def check_datasetinfo_exists(self):
return os.path.exists(Path(self.dataset_dir, self.name, "dataset_info.json"))
def get_raw_dataset(self):
raw_dir = Path(self.dataset_dir, self.name, "raw")
if not os.path.exists(Path(raw_dir, "train.csv")):
raise FileNotFoundError(f"Raw dataset `train.csv` not found in {raw_dir}")
else:
df = pd.read_csv(Path(raw_dir, "train.csv"))
return df
def get_dataset_info(self):
raw_df = pd.read_csv(Path(self.dataset_dir, self.name, "raw", "train.csv"))
metadata = {
'NumberOfClasses': raw_df[self.target_col].nunique(),
'NumberOfFeatures': raw_df.shape[1],
'NumberOfInstances': raw_df.shape[0],
'NumberOfInstancesWithMissingValues': int(raw_df.isnull().any(axis=1).sum()),
'NumberOfMissingValues': int(raw_df.isnull().sum().sum()),
'NumberOfNumericFeatures': raw_df.select_dtypes(include=['number']).shape[1],
'NumberOfSymbolicFeatures': raw_df.select_dtypes(include=['object']).shape[1],
}
df_head_text = raw_df.head().to_string(index=False)
dataset_info = {
"name": self.name,
"description": "",
"target_col": self.target_col,
"metadata": metadata,
"df_head": df_head_text
}
return dataset_info
def get_metric(self):
dataset_info = self.get_dataset_info()
num_classes = dataset_info["metadata"]["NumberOfClasses"]
if num_classes == 2:
metric = "f1"
elif 2 < num_classes <= 200:
metric = "f1 weighted"
elif num_classes > 200 or num_classes == 0:
metric = "rmse"
else:
raise ValueError(f"Number of classes {num_classes} not supported")
return metric
def create_base_requirement(self):
metric = self.get_metric()
req = BASE_USER_REQUIREMENT.format(datasetname=self.name, target_col=self.target_col, metric=metric)
return req
def save_dataset(self, target_col):
df = self.get_raw_dataset()
if not self.check_dataset_exists() or self.force_update:
print(f"Saving Dataset {self.name} in {self.dataset_dir}")
self.split_and_save(df, target_col)
else:
print(f"Dataset {self.name} already exists")
if not self.check_datasetinfo_exists() or self.force_update:
print(f"Saving Dataset info for {self.name}")
dataset_info = self.get_dataset_info()
self.save_datasetinfo(dataset_info)
else:
print(f"Dataset info for {self.name} already exists")
def save_datasetinfo(self, dataset_info):
with open(Path(self.dataset_dir, self.name, "dataset_info.json"), "w") as file:
json.dump(dataset_info, file, indent=4)
def save_split_datasets(self, df, split, target_col=None):
path = Path(self.dataset_dir, self.name)
df.to_csv(Path(path, f"split_{split}.csv"), index=False)
if target_col:
df_wo_target = df.drop(columns=[target_col])
df_wo_target.to_csv(Path(path, f"split_{split}_wo_target.csv"), index=False)
df_target = df[[target_col]].copy()
if target_col != "target":
df_target["target"] = df_target[target_col]
df_target = df_target.drop(columns=[target_col])
df_target.to_csv(Path(path, f"split_{split}_target.csv"), index=False)
def split_and_save(self, df, target_col):
if not target_col:
raise ValueError("Target column not provided")
train, test = train_test_split(df, test_size=1-TRAIN_TEST_SPLIT, random_state=SEED)
train, dev = train_test_split(train, test_size=1-TRAIN_DEV_SPLIT, random_state=SEED)
self.save_split_datasets(train, "train")
self.save_split_datasets(dev, "dev", target_col)
self.save_split_datasets(test, "test", target_col)
class OpenMLExpDataset(ExpDataset):
def __init__(self, name, dataset_dir, dataset_id, **kwargs):
self.dataset_id = dataset_id
self.dataset = openml.datasets.get_dataset(self.dataset_id,
download_data=False,
download_qualities=False,
download_features_meta_data=True)
self.name = self.dataset.name
self.target_col = self.dataset.default_target_attribute
super().__init__(self.name, dataset_dir, target_col=self.target_col, **kwargs)
def get_raw_dataset(self):
dataset = self.dataset
dataset_df, *_ = dataset.get_data()
raw_dir = Path(self.dataset_dir, self.name, "raw")
os.makedirs(raw_dir, exist_ok=True)
dataset_df.to_csv(Path(raw_dir, "train.csv"), index=False)
return dataset_df
def get_dataset_info(self):
dataset_info = super().get_dataset_info()
dataset = self.dataset
dataset_info["name"] = dataset.name
dataset_info["description"] = dataset.description
dataset_info["metadata"].update(dataset.qualities)
return dataset_info
# class HFExpDataset(ExpDataset):
# def __init__(self, name, dataset_dir, dataset_name, **kwargs):
# super().__init__(name, dataset_dir, **kwargs)
async def process_dataset(dataset, solution_designer, save_analysis_pool, datasets_dict):
if save_analysis_pool:
asyncio.run(solution_designer.generate_solutions(dataset.get_dataset_info(), dataset.name))
dataset_dict = create_dataset_dict(dataset)
datasets_dict["datasets"][dataset.name] = dataset_dict
if __name__ == "__main__":
datasets_dir = "D:/work/automl/datasets"
force_update = False
save_analysis_pool = False
datasets_dict = {"datasets": {}}
solution_designer = SolutionDesigner()
for dataset_id in OPENML_DATASET_IDS:
openml_dataset = OpenMLExpDataset("", datasets_dir, dataset_id, force_update=force_update)
asyncio.run(process_dataset(openml_dataset, solution_designer, save_analysis_pool, datasets_dict))
for dataset_name, target_col in CUSTOM_DATASETS:
custom_dataset = ExpDataset(dataset_name, datasets_dir, target_col=target_col, force_update=force_update)
asyncio.run(process_dataset(custom_dataset, solution_designer, save_analysis_pool, datasets_dict))
save_datasets_dict_to_yaml(datasets_dict)

153
expo/datasets.yaml Normal file
View file

@ -0,0 +1,153 @@
datasets:
titanic:
dataset: 04_titanic
metric: f1
target_col: Survived
user_requirement: "This is a 04_titanic dataset. Your goal is to predict the target\
\ column `Survived`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\
\ or make any visualizations.\n"
house-prices:
dataset: 05_house-prices-advanced-regression-techniques
metric: rmse
target_col: SalePrice
user_requirement: "This is a 05_house-prices-advanced-regression-techniques dataset.\
\ Your goal is to predict the target column `SalePrice`.\nPerform data analysis,\
\ data preprocessing, feature engineering, and modeling to predict the target.\
\ \nReport rmse on the eval data. Do not plot or make any visualizations.\n"
santander-customer:
dataset: 06_santander-customer-transaction-prediction
metric: f1
target_col: target
user_requirement: "This is a 06_santander-customer-transaction-prediction dataset.\
\ Your goal is to predict the target column `target`.\nPerform data analysis,\
\ data preprocessing, feature engineering, and modeling to predict the target.\
\ \nReport f1 on the eval data. Do not plot or make any visualizations.\n"
icr:
dataset: 07_icr-identify-age-related-conditions
metric: f1
target_col: Class
user_requirement: "This is a 07_icr-identify-age-related-conditions dataset. Your\
\ goal is to predict the target column `Class`.\nPerform data analysis, data\
\ preprocessing, feature engineering, and modeling to predict the target. \n\
Report f1 on the eval data. Do not plot or make any visualizations.\n"
Click_prediction_small:
dataset: Click_prediction_small
metric: f1
target_col: click
user_requirement: "This is a Click_prediction_small dataset. Your goal is to predict\
\ the target column `click`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 on the eval data.\
\ Do not plot or make any visualizations.\n"
GesturePhaseSegmentationProcessed:
dataset: GesturePhaseSegmentationProcessed
metric: f1 weighted
target_col: Phase
user_requirement: "This is a GesturePhaseSegmentationProcessed dataset. Your goal\
\ is to predict the target column `Phase`.\nPerform data analysis, data preprocessing,\
\ feature engineering, and modeling to predict the target. \nReport f1 weighted\
\ on the eval data. Do not plot or make any visualizations.\n"
Moneyball:
dataset: Moneyball
metric: rmse
target_col: RS
user_requirement: "This is a Moneyball dataset. Your goal is to predict the target\
\ column `RS`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport rmse on the eval data. Do not\
\ plot or make any visualizations.\n"
SAT11-HAND-runtime-regression:
dataset: SAT11-HAND-runtime-regression
metric: rmse
target_col: runtime
user_requirement: "This is a SAT11-HAND-runtime-regression dataset. Your goal\
\ is to predict the target column `runtime`.\nPerform data analysis, data preprocessing,\
\ feature engineering, and modeling to predict the target. \nReport rmse on\
\ the eval data. Do not plot or make any visualizations.\n"
boston:
dataset: boston
metric: rmse
target_col: MEDV
user_requirement: "This is a boston dataset. Your goal is to predict the target\
\ column `MEDV`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport rmse on the eval data. Do not\
\ plot or make any visualizations.\n"
colleges:
dataset: colleges
metric: rmse
target_col: percent_pell_grant
user_requirement: "This is a colleges dataset. Your goal is to predict the target\
\ column `percent_pell_grant`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport rmse on the eval\
\ data. Do not plot or make any visualizations.\n"
credit-g:
dataset: credit-g
metric: f1
target_col: class
user_requirement: "This is a credit-g dataset. Your goal is to predict the target\
\ column `class`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\
\ or make any visualizations.\n"
diamonds:
dataset: diamonds
metric: rmse
target_col: price
user_requirement: "This is a diamonds dataset. Your goal is to predict the target\
\ column `price`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport rmse on the eval data. Do not\
\ plot or make any visualizations.\n"
jasmine:
dataset: jasmine
metric: f1
target_col: class
user_requirement: "This is a jasmine dataset. Your goal is to predict the target\
\ column `class`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\
\ or make any visualizations.\n"
kc1:
dataset: kc1
metric: f1
target_col: defects
user_requirement: "This is a kc1 dataset. Your goal is to predict the target column\
\ `defects`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\
\ or make any visualizations.\n"
kick:
dataset: kick
metric: f1
target_col: IsBadBuy
user_requirement: "This is a kick dataset. Your goal is to predict the target\
\ column `IsBadBuy`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 on the eval data. Do not plot\
\ or make any visualizations.\n"
mfeat-factors:
dataset: mfeat-factors
metric: f1 weighted
target_col: class
user_requirement: "This is a mfeat-factors dataset. Your goal is to predict the\
\ target column `class`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 weighted on the\
\ eval data. Do not plot or make any visualizations.\n"
segment:
dataset: segment
metric: f1 weighted
target_col: class
user_requirement: "This is a segment dataset. Your goal is to predict the target\
\ column `class`.\nPerform data analysis, data preprocessing, feature engineering,\
\ and modeling to predict the target. \nReport f1 weighted on the eval data.\
\ Do not plot or make any visualizations.\n"
steel-plates-fault:
dataset: steel-plates-fault
metric: f1 weighted
target_col: target
user_requirement: "This is a steel-plates-fault dataset. Your goal is to predict\
\ the target column `target`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 weighted on the\
\ eval data. Do not plot or make any visualizations.\n"
wine-quality-white:
dataset: wine-quality-white
metric: f1 weighted
target_col: Class
user_requirement: "This is a wine-quality-white dataset. Your goal is to predict\
\ the target column `Class`.\nPerform data analysis, data preprocessing, feature\
\ engineering, and modeling to predict the target. \nReport f1 weighted on the\
\ eval data. Do not plot or make any visualizations.\n"

View file

@ -0,0 +1,23 @@
from sklearn.metrics import f1_score, accuracy_score, roc_auc_score, mean_squared_error
import numpy as np
def evaluate_score(pred, gt, metric):
if metric == "accuracy":
return accuracy_score(gt, pred)
elif metric == "f1":
unique_classes = np.unique(gt)
if 1 in unique_classes and 0 in unique_classes:
pos_label = 1
else:
pos_label = unique_classes[0] if len(unique_classes) == 2 else None
return f1_score(gt, pred, pos_label=pos_label)
elif metric == "f1 weighted":
return f1_score(gt, pred, average="weighted")
elif metric == "roc_auc":
return roc_auc_score(gt, pred)
elif metric == "rmse":
return mean_squared_error(gt, pred, squared=False)
elif metric == "log rmse":
return mean_squared_error(np.log1p(gt), np.log1p(pred), squared=False)
else:
raise ValueError(f"Metric {metric} not supported")

View file

@ -0,0 +1,54 @@
from expo.MCTS import Node, MCTS
import textwrap
NODE_TEMPLATE = """\
[Node {id}]
Plans:
{plans}
Simulated: {simulated}
Score: {score}, Visits: {num_visits}
"""
def get_role_plans(role):
plans = role.planner.plan.tasks
instruct_plans = [f"{i+1}. {task.instruction}" for i, task in enumerate(plans)]
return instruct_plans
def get_tree_text(node : Node):
role_dict = {}
code_set = set()
def load_role(node):
if node.id not in role_dict:
role_dict[node.id] = node.load_role()
return role_dict[node.id]
def visualize_node(node : Node, previous_plans=None):
role = load_role(node)
node_id = node.id
plans = role.planner.plan.tasks
instruct_plans = [f"{i+1}. {task.instruction}" for i, task in enumerate(plans)]
if previous_plans is not None:
instruct_plans = [plan for plan, prev_plan in zip(instruct_plans, previous_plans) if plan != prev_plan]
instruct_plans_text = "\n".join(instruct_plans)
simulated = role.state_saved
score = f"avg score: {node.avg_value()}, simulated score: {node.raw_reward}"
num_visits = node.visited
return NODE_TEMPLATE.format(id=node_id, plans=instruct_plans_text, simulated=simulated, score=score, num_visits=num_visits)
def visualize_tree(node, depth=0, previous_plans=None):
text = ""
if node is not None:
text += visualize_node(node, previous_plans)
role = load_role(node)
code_set.update({task.instruction for task in role.planner.plan.tasks})
previous_plans = get_role_plans(role)
for child in node.children:
text += textwrap.indent(visualize_tree(child, depth+1, previous_plans), "\t")
return text
return visualize_tree(node), len(code_set)

View file

@ -0,0 +1,4 @@
from .experimenter import Experimenter
from .mcts import MCTSExperimenter
from .aug import AugExperimenter
from .custom import CustomExperimenter

60
expo/experimenter/aug.py Normal file
View file

@ -0,0 +1,60 @@
from experimenter import Experimenter
from expo.MCTS import create_initial_state
from expo.dataset import generate_task_requirement
from expo.utils import mcts_logger, load_execute_notebook, get_exp_pool_path
from expo.insights.instruction_generator import InstructionGenerator
from expo.research_assistant import ResearchAssistant
EXPS_PROMPT = """
When doing the tasks, you can refer to the insights below:
{experience}
"""
class AugExperimenter(Experimenter):
result_path : str = "results/aug"
async def run_experiment(self):
state = create_initial_state(self.args.task, start_task_id=1, data_config=self.data_config, low_is_better=self.args.low_is_better, name="")
user_requirement = state["requirement"]
exp_pool_path = get_exp_pool_path(self.args.task, self.data_config, pool_name="ds_analysis_pool")
exp_pool = InstructionGenerator.load_analysis_pool(exp_pool_path)
if self.args.aug_mode == "single":
exps = InstructionGenerator._random_sample(exp_pool, self.args.num_experiments)
exps = [exp["Analysis"] for exp in exps]
elif self.args.aug_mode == "set":
exp_set = InstructionGenerator.sample_instruction_set(exp_pool)
exp_set_text = "\n".join([f"{exp['task_id']}: {exp['Analysis']}" for exp in exp_set])
exps = [exp_set_text] * self.args.num_experiments
else:
raise ValueError(f"Invalid mode: {self.args.aug_mode}")
results = []
for i in range(self.args.num_experiments):
di = ResearchAssistant(node_id=str(i), use_reflection=self.args.reflection)
di.role_dir = f"{di.role_dir}_{self.args.task}"
requirement = user_requirement + EXPS_PROMPT.format(experience=exps[i])
print(requirement)
await di.run(requirement)
score_dict = await di.get_score()
score_dict = self.evaluate(score_dict, state)
results.append({
"idx": i,
"score_dict": score_dict,
"aug_mode": self.args.aug_mode,
"insights" : exps[i],
"user_requirement": requirement,
"args": vars(self.args)
})
scores = [result["score_dict"]["test_score"] for result in results]
avg_score = sum(scores) / len(scores)
best_score = max(scores) if not self.args.low_is_better else min(scores)
best_score_idx = scores.index(best_score)
results.insert(0, {"avg_score": avg_score, "best_score": best_score, "best_score_idx": best_score_idx})
self.save_result(results)

View file

@ -0,0 +1,60 @@
from expo.experimenter import Experimenter
from expo.MCTS import create_initial_state
from expo.evaluation.evaluation import evaluate_score
import pandas as pd
import os
class CustomExperimenter(Experimenter):
result_path : str = "results/custom"
def __init__(self, args, **kwargs):
super().__init__(args, **kwargs)
self.framework = kwargs["framework"] # todo
self.task = kwargs.get("task", self.args.task)
self.low_is_better = kwargs.get("low_is_better", self.args.low_is_better)
self.name = kwargs.get("name", "")
self.result_path = f"results/custom_{self.name}"
self.state = create_initial_state(self.task, start_task_id=1, data_config=self.data_config, low_is_better=self.low_is_better, name=self.name)
async def run_experiment(self):
user_requirement = self.state["requirement"]
preds = await self.framework.run(user_requirement)
test_preds = preds["test_preds"]
dev_preds = preds["dev_preds"]
score_dict = {
"dev_score": self.evaluate_predictions(dev_preds, "dev"),
"test_score": self.evaluate_predictions(test_preds, "test")
}
results = {
"score_dict": score_dict,
"user_requirement": user_requirement,
"args": vars(self.args)
}
self.save_result(results)
def evaluate_pred_files(self, dev_pred_path, test_pred_path):
dev_preds = pd.read_csv(dev_pred_path)["target"]
test_preds = pd.read_csv(test_pred_path)["target"]
score_dict = {
"dev_score": self.evaluate_score(dev_preds, "dev"),
"test_score": self.evaluate_score(test_preds, "test")
}
return score_dict
def evaluate_predictions(self, preds, split):
metric = self.state["dataset_config"]["metric"]
gt_path = os.path.join(self.state["datasets_dir"][f"{split}_target"])
gt = pd.read_csv(gt_path)["target"]
score = evaluate_score(preds, gt, metric)
return score
def load_datasets(self):
train_path = self.state["datasets_dir"]["train"]
dev_path = self.state["datasets_dir"]["dev"]
test_path = self.state["datasets_dir"]["test"]
train = pd.read_csv(train_path)
dev = pd.read_csv(dev_path)
test = pd.read_csv(test_path)
return train, dev, test

View file

@ -0,0 +1,66 @@
from expo.utils import DATA_CONFIG
import os
import pandas as pd
from expo.evaluation.evaluation import evaluate_score
import datetime
import json
from expo.MCTS import create_initial_state
from expo.research_assistant import ResearchAssistant
class Experimenter:
result_path : str = "results/base"
data_config = DATA_CONFIG
def __init__(self, args, **kwargs):
self.args = args
self.start_time = datetime.datetime.now().strftime("%Y%m%d%H%M")
async def run_experiment(self):
state = create_initial_state(self.args.task, start_task_id=1, data_config=self.data_config, low_is_better=self.args.low_is_better, name="")
user_requirement = state["requirement"]
results = []
for i in range(self.args.num_experiments):
di = ResearchAssistant(node_id="0", use_reflection=self.args.reflection)
await di.run(user_requirement)
score_dict = await di.get_score()
score_dict = self.evaluate(score_dict, state)
results.append({
"idx": i,
"score_dict": score_dict,
"user_requirement": user_requirement,
"args": vars(self.args)
})
scores = [result["score_dict"]["test_score"] for result in results]
avg_score = sum(scores) / len(scores)
best_score = max(scores) if not self.args.low_is_better else min(scores)
best_score_idx = scores.index(best_score)
results.insert(0, {"avg_score": avg_score, "best_score": best_score, "best_score_idx": best_score_idx})
self.save_result(results)
def evaluate_prediction(self, split, state):
pred_path = os.path.join(state["work_dir"], state["task"], f"{split}_predictions.csv")
os.makedirs(state["node_dir"], exist_ok=True)
pred_node_path = os.path.join(state["node_dir"], f"{self.start_time}-{split}_predictions.csv")
gt_path = os.path.join(state["datasets_dir"][f"{split}_target"])
preds = pd.read_csv(pred_path)["target"]
preds.to_csv(pred_node_path, index=False)
gt = pd.read_csv(gt_path)["target"]
metric = state["dataset_config"]["metric"]
return evaluate_score(preds, gt, metric)
def evaluate(self, score_dict, state):
scores = {
"dev_score": self.evaluate_prediction("dev", state),
"test_score": self.evaluate_prediction("test", state),
}
score_dict.update(scores)
return score_dict
def save_result(self, result):
os.makedirs(self.result_path, exist_ok=True)
with open(f"{self.result_path}/{self.args.exp_mode}-{self.args.task}_{self.start_time}.json", "w") as f:
json.dump(result, f, indent=4)

45
expo/experimenter/mcts.py Normal file
View file

@ -0,0 +1,45 @@
from expo.experimenter import Experimenter
from expo.dataset import generate_task_requirement
from expo.MCTS import MCTS
from expo.evaluation.visualize_mcts import get_tree_text
class MCTSExperimenter(Experimenter):
result_path : str = "results/mcts"
async def run_experiment(self):
mcts = MCTS(root_node=None, max_depth=5)
best_nodes = await mcts.search(self.args.task, self.data_config,
low_is_better=self.args.low_is_better,
load_tree=self.args.load_tree,
reflection=self.args.reflection,
rollouts=self.args.rollouts,
name=self.args.name)
best_node = best_nodes["global_best"]
dev_best_node = best_nodes["dev_best"]
text, num_generated_codes = get_tree_text(mcts.root_node)
text += f"Generated {num_generated_codes} unique codes.\n"
text += f"Best node: {best_node}, score: {best_node.raw_reward}\n"
text += f"Dev best node: {dev_best_node}, score: {dev_best_node.raw_reward}\n"
print(text)
if self.args.rollouts > 0:
self.save_tree(text)
results = {
"best_node": best_node.id,
"best_node_score": best_node.raw_reward,
"dev_best_node": dev_best_node.id,
"dev_best_node_score": dev_best_node.raw_reward,
"num_generated_codes": num_generated_codes,
"user_requirement": best_node.state["requirement"],
"args": vars(self.args)
}
self.save_result(results)
def save_tree(self, tree_text):
fpath = f"{self.result_path}/{self.args.task}_tree_{self.args.name}.txt"
with open(fpath, "w") as f:
f.write(tree_text)

View file

@ -0,0 +1,103 @@
REFLECTION_SYSTEM_MSG = "As a Kaggle grandmaster participating in a competition, you need to analyze your experience and propose evolutionary points that are more likely to improve the performance of baseline code."
CHANGE_INSTRUCTION = """
# Original instruction
{instruction}
# Insights
{insights}
Rewrite the original instruction according to the insights
# Expected Output Hard Format
```json
{{
"Original Instruction": "original instruction",
"New Instruction": "new instruction"
}}
```
"""
import re
import random
import json
from metagpt.llm import LLM
from metagpt.schema import Message
from expo.utils import load_data_config, mcts_logger, clean_json_from_rsp
DATA_CONFIG = load_data_config()
class InstructionGenerator:
data_config = DATA_CONFIG
@staticmethod
def load_json_data(json_dir):
with open(json_dir, "r") as file:
json_data = json.load(file)
return json_data
@staticmethod
def _random_sample(analysis, num_samples):
return random.sample(analysis, num_samples)
@staticmethod
def sample_instruction_set(data):
data_dict = {}
for item in data:
task_id = item["task_id"]
if task_id not in data_dict:
data_dict[task_id] = []
data_dict[task_id].append(item)
instruction_set = []
for task_id in sorted(data_dict.keys()):
instruction_set.append(random.choice(data_dict[task_id]))
return instruction_set
@staticmethod
def format_output(rsp):
rsp_list = []
new_data = []
rsp_list.append(rsp)
for item in rsp_list:
item_dict = json.loads(item)
data = {
"Insights": item_dict,
}
new_data.append(data)
return new_data
@staticmethod
def load_analysis_pool(file_path, task_id=None):
data = InstructionGenerator.load_json_data(file_path)
for item in data:
if "task_id" not in item:
raise ValueError("task_id is not found in the analysis pool")
if task_id:
data = [item for item in data if int(item["task_id"]) == int(task_id)]
return data
@staticmethod
async def generate_new_instructions(task_id, original_instruction, max_num, file_path):
data = InstructionGenerator.load_analysis_pool(file_path, task_id)
new_instructions = []
if len(data) == 0:
mcts_logger.log("MCTS", f"No insights available for task {task_id}")
return [original_instruction] # Return the original instruction if no insights are available
for item in data[:max_num]:
insights = item["Analysis"]
new_instruction = await InstructionGenerator.generate_new_instruction(original_instruction, insights)
new_instructions.append(new_instruction)
return new_instructions
@staticmethod
async def generate_new_instruction(original_instruction, insights):
prompt = CHANGE_INSTRUCTION.format(instruction=original_instruction, insights=insights)
llm = LLM()
context = llm.format_msg([Message(content=prompt, role="user")])
llm_response = await llm.aask(
context, system_msgs=[REFLECTION_SYSTEM_MSG]
)
rsp = clean_json_from_rsp(llm_response)
new_instruction = json.loads(rsp)["New Instruction"]
return new_instruction

View file

@ -0,0 +1,127 @@
import re
import random
import json
from metagpt.llm import LLM
from metagpt.schema import Message
from expo.utils import clean_json_from_rsp, load_data_config
DATA_CONFIG = load_data_config()
DATASET_INSIGHT_PROMPT = """
# Dataset Description
{dataset}
# Dataset Metadata
{metadata}
# Dataset Head
{head}
# Instruction
Propose insights to help improve the performance of the model on this dataset.
The insights should be proposed based on the dataset description with different task types.
Each task type should have at least 5 insights.
Make sure each method is independent and can be implemented separately.
# Format
```json
[
{{
"task_type": "EDA",
"insights": [
"insight1",
"insight2",
"insight3",
...
"insightN"
]
}},
{{
"task_type": "Data Preprocessing",
"insights": [
"insight1",
"insight2",
"insight3",
...
"insightN"
]
}},
{{
"task_type": "Feature Engineering",
"insights": [
"insight1",
"insight2",
"insight3",
...
"insightN"
]
}},
{{
"task_type": "Model Training",
"insights": [
"insight1",
"insight2",
"insight3",
...
"insightN"
]
}}
]
```
"""
KEY_DATASET_FEATURES = [
'NumberOfClasses',
'NumberOfFeatures',
'NumberOfInstances',
'NumberOfInstancesWithMissingValues',
'NumberOfMissingValues',
'NumberOfNumericFeatures',
'NumberOfSymbolicFeatures'
]
TASK_TO_ID = {
"EDA": 1,
"Data Preprocessing": 2,
"Feature Engineering": 3,
"Model Training": 4,
"Model Evaluation": 5
}
class SolutionDesigner:
data_dir : str= DATA_CONFIG["datasets_dir"]
async def generate_solutions(self, dataset_info, dataset_name):
llm = LLM()
context = DATASET_INSIGHT_PROMPT.format(dataset=dataset_info["description"],
metadata=self.metadata_builder(dataset_info["metadata"]),
head=dataset_info["df_head"])
rsp = await llm.aask(context)
rsp = clean_json_from_rsp(rsp)
analysis_pool = self.process_analysis_pool(json.loads(rsp))
dataset_path = f"{self.data_dir}/{dataset_name}"
self.save_analysis_pool(dataset_path, analysis_pool)
def process_analysis_pool(self, insights_rsp):
analysis_pool = []
for task_type_insights in insights_rsp:
task_type = task_type_insights["task_type"]
for insight in task_type_insights["insights"]:
analysis_pool.append({"Analysis": insight, "Category": task_type, "task_id": TASK_TO_ID[task_type]})
return analysis_pool
def metadata_builder(self, qualities):
metadata = {}
for key in KEY_DATASET_FEATURES:
metadata[key] = qualities.get(key, "N/A")
metadata_text = json.dumps(metadata, indent=4)
return metadata_text
def save_analysis_pool(self, dataset_path, analysis_pool):
fpath = f"{dataset_path}/ds_analysis_pool.json"
with open(fpath, "w") as file:
json.dump(analysis_pool, file, indent=4)

5
expo/requirements.txt Normal file
View file

@ -0,0 +1,5 @@
# expo
openml==0.14.2
# ml module to run in DI
xgboost
catboost

150
expo/research_assistant.py Normal file
View file

@ -0,0 +1,150 @@
from __future__ import annotations
import json
from metagpt.roles.di.data_interpreter import DataInterpreter
from metagpt.schema import Message, Task, TaskResult
from metagpt.strategy.task_type import TaskType
from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender
from metagpt.utils.common import CodeParser
from metagpt.utils.common import write_json_file, read_json_file, format_trackback_info
from metagpt.const import MESSAGE_ROUTE_TO_ALL, SERDESER_PATH
from expo.utils import mcts_logger, save_notebook
from pydantic import Field, model_validator
from metagpt.actions.di.write_analysis_code import CheckData, WriteAnalysisCode
import re
import os
EXTRACT_SCORE_PROMPT = """
# Code:
{code}
# Execution Result:
{result}
# Instruction:
Based on the code and execution result, please extract the scores and return it as a dictionary.
If you cannot find the scores, please still return a dictionary with the keys 'train_score', 'dev_score', and 'test_score', and set the values to -1.
# Format:
```json
{{
"train_score": x.x,
"dev_score": x.x,
"test_score": x.x
}}
```
"""
class ResearchAssistant(DataInterpreter):
node_id: str = "0"
start_task_id: int = 1
state_saved : bool = False
role_dir : str = SERDESER_PATH.joinpath("team", "environment", "roles", f"Experimenter")
def get_node_name(self):
return f"Node-{self.node_id}"
def get_next_instruction(self):
return self.planner.plan.tasks[self.start_task_id]
def change_next_instruction(self, new_instruction):
if new_instruction is not None:
self.planner.plan.task_map[str(self.start_task_id)].instruction = new_instruction
self.remap_tasks()
def update_til_start_task(self, role: ResearchAssistant, backward: bool = True):
if backward:
# make sure the previous task instructions are matched
assert self.start_task_id == role.start_task_id - 1, f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}"
for i in range(self.start_task_id):
if self.planner.plan.task_map[str(self.start_task_id)].instruction != role.planner.plan.task_map[str(self.start_task_id)].instruction:
mcts_logger.info("Previous task instructions not matched")
self.remap_tasks()
return
# copy new role's task (self.start_task_id) to current role
self.planner.plan.task_map[str(self.start_task_id)] = role.planner.plan.task_map[str(self.start_task_id)].model_copy()
self.remap_tasks()
else:
assert self.start_task_id == role.start_task_id + 1, f"start_task_id: {self.start_task_id}, role.start_task_id: {role.start_task_id}"
if int(role.planner.plan.current_task_id) > self.start_task_id:
for i in range(role.start_task_id):
self.planner.plan.task_map[str(i)] = role.planner.plan.task_map[str(i)].model_copy()
self.remap_tasks()
async def get_score(self):
score_dict = await self.llm_extract_score()
score_dict["score"] = score_dict["dev_score"]
return score_dict
async def llm_extract_score(self):
result_text = self.planner.plan.task_map[str(len(self.planner.plan.task_map))].result
code_text = self.planner.plan.task_map[str(len(self.planner.plan.task_map))].code
rsp = await self.llm.aask(EXTRACT_SCORE_PROMPT.format(code=code_text, result=result_text, role="user"))
json_block = CodeParser.parse_code(block=None, text=rsp)
score_dict = json.loads(json_block)
return score_dict
@model_validator(mode="after")
def set_plan_and_tool(self) -> "Interpreter":
if self.planner.plan.goal != '':
self.set_actions([WriteAnalysisCode])
self._set_state(0)
print("Plan already exists, skipping initialization.")
return self
print("Initializing plan and tool...")
return super().set_plan_and_tool()
async def _act_on_task(self, current_task: Task) -> TaskResult:
"""Useful in 'plan_and_act' mode. Wrap the output in a TaskResult for review and confirmation."""
mcts_logger.info(f"The current_task is: {current_task}")
code, result, is_success = await self._write_and_exec_code()
task_result = TaskResult(code=code, result=result, is_success=is_success)
if int(current_task.task_id) == self.start_task_id + 1:
# fe_id = current_task.dependent_task_ids
self.save_state()
save_notebook(role=self, save_dir=self.role_dir, name=self.get_node_name())
return task_result
def save_state(self, static_save=False):
if self.state_saved and not static_save:
return
if not static_save:
self.state_saved = True
mcts_logger.log("MCTS", f"Saving state at task {self.start_task_id}")
else:
mcts_logger.log("MCTS", f"Static Saving")
stg_path = self.role_dir
name = self.get_node_name()
role_path = os.path.join(stg_path, f"{name}.json")
# 将状态保存为 JSON 文件
write_json_file(role_path, self.model_dump())
def remap_tasks(self):
self.planner.plan.tasks = [self.planner.plan.task_map[task_id] for task_id in sorted(self.planner.plan.task_map.keys())]
async def run(self, with_message=None) -> Message | None:
"""Observe, and think and act based on the results of the observation"""
if with_message == "continue":
# self.set_todo(None)
# working_memory = self.working_memory
# self.remap_tasks()
mcts_logger.info("Continue to run")
self.rc.working_memory.clear()
self.working_memory.clear()
# self.rc.todo = WriteAnalysisCode()
rsp = await self.react()
# 发送响应消息给 Environment 对象,以便它将消息传递给订阅者
self.set_todo(None)
self.publish_message(rsp)
return rsp
return await super().run(with_message)

0
expo/results/PLACEHOLDER Normal file
View file

0
expo/results/tree/TREE Normal file
View file

View file

@ -0,0 +1,97 @@
import os
from expo.research_assistant import ResearchAssistant
import asyncio
from expo.utils import DATA_CONFIG, get_exp_pool_path
from expo.dataset import generate_task_requirement
from expo.insights.instruction_generator import InstructionGenerator
from expo.MCTS import create_initial_state
from expo.evaluation.evaluation import evaluate_score
import json
import argparse
import pandas as pd
import datetime
EXPS_PROMPT = """
When doing the tasks, you can refer to the insights below:
{experience}
"""
data_config = DATA_CONFIG
def evaluate_test(score, state):
datetime_text = datetime.datetime.now().strftime("%Y%m%d%H%M")
task_name = state["task"]
prediction_fpath = os.path.join(state["work_dir"], task_name, "predictions.csv")
predictions = pd.read_csv(prediction_fpath)["target"]
# copy predictions.csv to the node_dir
predictions_node_fpath = os.path.join("results", f"{task_name}-{datetime_text}-predictions.csv")
predictions.to_csv(predictions_node_fpath, index=False)
# load test_target.csv
split_datasets_dir = state["datasets_dir"]
gt = pd.read_csv(os.path.join(split_datasets_dir["test_target"]))["target"]
metric = state["dataset_config"]["metric"]
score["test_score"] = evaluate_score(predictions, gt, metric)
return score
async def main(task_name, use_reflection=True, mode="single", num_experiments=2):
"""
mode: single or set
single: sample one instruction
set: sample a set of instructions
"""
low_is_better = False
state = create_initial_state(task_name, start_task_id=1, data_config=data_config, low_is_better=low_is_better, name="")
user_requirement = generate_task_requirement(task_name, data_config)
exp_pool_path = get_exp_pool_path(task_name, data_config, pool_name="ds_analysis_pool")
exp_pool = InstructionGenerator.load_analysis_pool(exp_pool_path)
if mode == "single":
exps = InstructionGenerator._random_sample(exp_pool, num_experiments)
exps = [exp["Analysis"] for exp in exps]
elif mode == "set":
exp_set = InstructionGenerator.sample_instruction_set(exp_pool)
exp_set_text = "\n".join([f"{exp['task_id']}: {exp['Analysis']}" for exp in exp_set])
exps = [exp_set_text] * num_experiments
else:
raise ValueError(f"Invalid mode: {mode}")
scores = []
for i in range(num_experiments):
di = ResearchAssistant(node_id=str(i), use_reflection=use_reflection)
di.role_dir = f"{di.role_dir}_{task_name}"
requirement = user_requirement + EXPS_PROMPT.format(experience=exps[i])
print(requirement)
await di.run(requirement)
score = await di.get_score(low_is_better=False)
score = evaluate_test(score, state)
scores.append(score)
with open(f"results/{task_name}_scores.json", "w") as f:
# save scores and corresponding insights
results = {"avg_score": sum([score["test_score"] for score in scores if score])/num_experiments,
"max_score": max([score["test_score"] for score in scores]),
"scores": scores, "insights": exps}
json.dump(results, f, indent=4)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--task", type=str, default="titanic")
parser.add_argument("--use_reflection", dest="use_reflection", action="store_true")
parser.add_argument("--no_use_reflection", dest="use_reflection", action="store_false")
parser.set_defaults(use_reflection=True)
parser.add_argument("--mode", type=str, default="single")
parser.add_argument("--num_experiments", type=int, default=2)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
asyncio.run(main(args.task, use_reflection=args.use_reflection, mode=args.mode, num_experiments=args.num_experiments))

51
expo/run_experiment.py Normal file
View file

@ -0,0 +1,51 @@
from expo.experimenter import MCTSExperimenter, Experimenter, AugExperimenter, CustomExperimenter
import asyncio
import argparse
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("--name", type=str, default="")
parser.add_argument("--exp_mode", type=str, default="mcts", choices=["mcts", "aug", "base", "custom"])
get_di_args(parser)
get_mcts_args(parser)
get_aug_exp_args(parser)
return parser.parse_args()
def get_mcts_args(parser):
parser.add_argument("--load_tree", dest="load_tree", action="store_true")
parser.add_argument("--no_load_tree", dest="load_tree", action="store_false")
parser.set_defaults(load_tree=False)
parser.add_argument("--rollouts", type=int, default=5)
def get_aug_exp_args(parser):
parser.add_argument("--aug_mode", type=str, default="single", choices=["single", "set"])
parser.add_argument("--num_experiments", type=int, default=1)
def get_di_args(parser):
parser.add_argument("--task", type=str, default="titanic")
parser.add_argument("--low_is_better", dest="low_is_better", action="store_true")
parser.set_defaults(low_is_better=False)
parser.add_argument("--reflection", dest="reflection", action="store_true")
parser.add_argument("--no_reflection", dest="reflection", action="store_false")
parser.set_defaults(reflection=True)
async def main(args):
if args.exp_mode == "mcts":
experimenter = MCTSExperimenter(args)
elif args.exp_mode == "aug":
experimenter = AugExperimenter(args)
elif args.exp_mode == "base":
experimenter = Experimenter(args)
elif args.exp_mode == "custom":
experimenter = CustomExperimenter(args)
else:
raise ValueError(f"Invalid exp_mode: {args.exp_mode}")
await experimenter.run_experiment()
if __name__ == "__main__":
args = get_args()
asyncio.run(main(args))

53
expo/run_mcts.py Normal file
View file

@ -0,0 +1,53 @@
from expo.MCTS import MCTS, Node, initialize_di_root_node
from expo.utils import load_data_config
from expo.dataset import generate_task_requirement
from expo.evaluation.visualize_mcts import get_tree_text
import asyncio
import argparse
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("--task", type=str, default="titanic")
parser.add_argument("--low_is_better", dest="low_is_better", action="store_true")
parser.set_defaults(low_is_better=False)
parser.add_argument("--load_tree", dest="load_tree", action="store_true")
parser.add_argument("--no_load_tree", dest="load_tree", action="store_false")
parser.set_defaults(load_tree=True)
parser.add_argument("--reflection", dest="reflection", action="store_true")
parser.add_argument("--no_reflection", dest="reflection", action="store_false")
parser.set_defaults(reflection=True)
parser.add_argument("--rollouts", type=int, default=3)
parser.add_argument("--name", type=str, default="")
return parser.parse_args()
data_config = load_data_config()
if __name__ == "__main__":
args = get_args()
# requirement = generate_task_requirement(args.task, data_config)
# print(requirement)
# role, root_node = initialize_di_root_node(requirement, data_config)
# asyncio.run(role.run(requirement))
# asyncio.run(root_node.run_node())
mcts = MCTS(root_node=None, max_depth=5)
best_nodes = asyncio.run(mcts.search(args.task, data_config,
low_is_better=args.low_is_better, load_tree=args.load_tree,
reflection=args.reflection, rollouts=args.rollouts, name=args.name))
best_node = best_nodes["global_best"]
dev_best_node = best_nodes["dev_best"]
text, num_generated_codes = get_tree_text(mcts.root_node)
print(text)
print(f"Generated {num_generated_codes} unique codes.")
with open(f"results/{args.task}_tree{args.name}.txt", "w") as f:
f.write(f"Generated {num_generated_codes} unique codes.\n")
f.write(f"Best node: {best_node}, score: {best_node.raw_reward}\n")
f.write(f"Dev best node: {dev_best_node}, score: {dev_best_node.raw_reward}\n")
f.write(text)

109
expo/utils.py Normal file
View file

@ -0,0 +1,109 @@
import yaml
from metagpt.roles.role import Role
from metagpt.actions.di.execute_nb_code import ExecuteNbCode
# from nbclient import NotebookClient
from nbformat.notebooknode import NotebookNode
import nbformat
from pathlib import Path
from loguru import logger as _logger
from datetime import datetime
import sys
import os
import re
def load_data_config(file_path="data.yaml"):
with open(file_path, 'r') as stream:
data_config = yaml.safe_load(stream)
return data_config
DATA_CONFIG = load_data_config()
def get_mcts_logger():
print_level = "INFO"
print_level2 = "MCTS"
logfile_level="MCTS"
name: str = None
current_date = datetime.now()
formatted_date = current_date.strftime("%Y%m%d")
log_name = f"{name}_{formatted_date}" if name else formatted_date # name a log with prefix name
_logger.remove()
new_level = _logger.level(logfile_level, color="<green>", no=25)
_logger.add(sys.stderr, level=print_level)
_logger.add(sys.stderr, level=print_level2)
_logger.add(Path(DATA_CONFIG["work_dir"]) / DATA_CONFIG["role_dir"] / f"{log_name}.txt", level=logfile_level)
_logger.propagate = False
return _logger
mcts_logger = get_mcts_logger()
def get_exp_pool_path(task_name, data_config, pool_name="analysis_pool"):
datasets_dir = data_config['datasets_dir']
if task_name in data_config['datasets']:
dataset = data_config['datasets'][task_name]
data_path = os.path.join(datasets_dir, dataset['dataset'])
else:
raise ValueError(f"Dataset {task_name} not found in config file. Available datasets: {data_config['datasets'].keys()}")
exp_pool_path = os.path.join(data_path, f"{pool_name}.json")
return exp_pool_path
def change_plan(role, plan):
print(f"Change next plan to: {plan}")
tasks = role.planner.plan.tasks
finished = True
for i, task in enumerate(tasks):
if not task.code:
finished = False
break
if not finished:
tasks[i].plan = plan
return finished
def is_cell_to_delete(cell: NotebookNode) -> bool:
if "outputs" in cell:
for output in cell["outputs"]:
if output and "traceback" in output:
return True
return False
def process_cells(nb: NotebookNode) -> NotebookNode:
new_cells = []
i = 1
for cell in nb["cells"]:
if cell["cell_type"] == "code" and not is_cell_to_delete(cell):
cell["execution_count"] = i
new_cells.append(cell)
i = i + 1
nb["cells"] = new_cells
return nb
def save_notebook(role: Role, save_dir: str = "", name: str = ""):
save_dir = Path(save_dir)
nb = process_cells(role.execute_code.nb)
file_path = save_dir / f"{name}.ipynb"
nbformat.write(nb, file_path)
async def load_execute_notebook(role):
tasks = role.planner.plan.tasks
codes = [task.code for task in tasks if task.code]
executor = role.execute_code
# await executor.build()
for code in codes:
outputs, success = await executor.run(code)
print(f"Execution success: {success}, Output: {outputs}")
print("Finish executing the loaded notebook")
return executor
def clean_json_from_rsp(text):
pattern = r"```json(.*?)```"
matches = re.findall(pattern, text, re.DOTALL)
if matches:
json_str = "\n".join(matches)
return json_str
else:
return ""

View file

@ -69,7 +69,7 @@ Output a json following the format:
```json
{{
"reflection": str = "Reflection on previous implementation",
"improved_impl": str = "Refined code after reflection.",
"improved_impl": str = "Refined code after reflection (do not include nested code block here).",
}}
```
"""

View file

@ -25,7 +25,7 @@ The current task is about feature engineering. when performing it, please adhere
- Use available feature engineering tools if they are potential impactful.
- Avoid creating redundant or excessively numerous features in one step.
- Exclude ID columns from feature generation and remove them.
- Each feature engineering operation performed on the train set must also applies to the test separately at the same time.
- Each feature engineering operation performed on the train set must also applies to the dev/test separately at the same time.
- Avoid using the label column to create features, except for cat encoding.
- Use the data from previous task result if exist, do not mock or reload data yourself.
- Always copy the DataFrame before processing it and use the copy to process.

View file

@ -478,10 +478,10 @@ class Role(SerializationMixin, ContextMixin, BaseModel):
async def _plan_and_act(self) -> Message:
"""first plan, then execute an action sequence, i.e. _think (of a plan) -> _act -> _act -> ... Use llm to come up with the plan dynamically."""
# create initial plan and update it until confirmation
goal = self.rc.memory.get()[-1].content # retreive latest user requirement
await self.planner.update_plan(goal=goal)
if not self.planner.plan.goal:
# create initial plan and update it until confirmation
goal = self.rc.memory.get()[-1].content # retreive latest user requirement
await self.planner.update_plan(goal=goal)
# take on tasks until all finished
while self.planner.current_task: