Merge branch 'dev' into dev_make_tools

This commit is contained in:
刘棒棒 2023-12-21 10:36:13 +08:00
commit d72b457c83
7 changed files with 316 additions and 62 deletions

1
.gitignore vendored
View file

@ -165,4 +165,3 @@ tmp
output.wav
metagpt/roles/idea_agent.py
.aider*
/tests/metagpt/actions/check_data.py

View file

@ -46,9 +46,12 @@ class ExecuteCode(ABC):
class ExecutePyCode(ExecuteCode, Action):
"""execute code, return result to llm, and display it."""
def __init__(self, name: str = "python_executor", context=None, llm=None):
def __init__(self, name: str = "python_executor", context=None, llm=None, nb=None):
super().__init__(name, context, llm)
self.nb = nbformat.v4.new_notebook()
if nb is None:
self.nb = nbformat.v4.new_notebook()
else:
self.nb = nb
self.nb_client = NotebookClient(self.nb)
self.console = Console()
self.interaction = "ipython" if self.is_ipython() else "terminal"

View file

@ -210,6 +210,8 @@ class WriteCodeWithTools(BaseWriteAnalysisCode):
module_name=module_name,
tool_catalog=tool_catalog,
)
else:
prompt = GENERATE_CODE_PROMPT.format(
user_requirement=plan.goal,

View file

@ -106,7 +106,7 @@ class WriteCodeSteps(Action):
def process_task(task):
task_dict = task.dict()
# ptask = {k: task_dict[k] for k in task_dict if k in select_task_keys }
ptask = f"task_id_{task_dict['task_id']}:{task_dict['instruction']}\n"
ptask = f"task_id_{task_dict['task_id']}:{task_dict['instruction']}"
return ptask

View file

@ -1,4 +1,4 @@
from typing import List
from typing import List
import json
from datetime import datetime
@ -25,6 +25,7 @@ from metagpt.roles.kaggle_manager import DownloadData, SubmitResult
from metagpt.schema import Message, Plan
from metagpt.utils.common import remove_comments, create_func_config
from metagpt.utils.save_code import save_code_file
from metagpt.utils.recovery_util import save_history, load_history
class UpdateDataColumns(Action):
@ -45,21 +46,21 @@ class MLEngineer(Role):
super().__init__(name=name, profile=profile, goal=goal)
self._set_react_mode(react_mode="plan_and_act")
self._watch([DownloadData, SubmitResult])
self.plan = Plan(goal=goal)
self.use_tools = False
self.make_udfs = False # user-defined functions
self.use_udfs = False
self.use_code_steps = False
self.use_tools = True
self.use_code_steps = True
self.execute_code = ExecutePyCode()
self.auto_run = auto_run
self.data_desc = {}
# memory for working on each task, discarded each time a task is done
self.working_memory = Memory()
async def _plan_and_act(self):
### Actions in a multi-agent multi-turn setting ###
memories = self.get_memories()
if memories:
@ -69,29 +70,29 @@ class MLEngineer(Role):
elif latest_event == SubmitResult:
# self reflect on previous plan outcomes and think about how to improve the plan, add to working memory
await self._reflect()
# get feedback for improvement from human, add to working memory
await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
### Common Procedure in both single- and multi-agent setting ###
# create initial plan and update until confirmation
await self._update_plan()
while self.plan.current_task:
task = self.plan.current_task
logger.info(f"ready to take on task {task}")
# take on current task
code, result, success = await self._write_and_exec_code()
# ask for acceptance, users can other refuse and change tasks in the plan
review, task_result_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
if self.auto_run:
# if human confirms the task result, then we deem the task completed, regardless of whether the code run succeeds;
# if auto mode, then the code run has to succeed for the task to be considered completed
task_result_confirmed = success
if task_result_confirmed:
# tick off this task and record progress
task.code = code
@ -103,9 +104,9 @@ class MLEngineer(Role):
success, new_code = await self._update_data_columns()
if success:
task.code = task.code + "\n\n" + new_code
confirmed_and_more = (ReviewConst.CONTINUE_WORD[0] in review.lower()
and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)"
and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)"
if confirmed_and_more:
self.working_memory.add(Message(content=review, role="user", cause_by=AskReview))
await self._update_plan(review)
@ -114,23 +115,23 @@ class MLEngineer(Role):
# Ask the Role to redo this task with help of review feedback,
# useful when the code run is successful but the procedure or result is not what we want
continue
else:
# update plan according to user's feedback and to take on changed tasks
await self._update_plan(review)
completed_plan_memory = self.get_useful_memories() # completed plan as a outcome
self._rc.memory.add(completed_plan_memory[0]) # add to persistent memory
summary = await SummarizeAnalysis().run(self.plan)
rsp = Message(content=summary, cause_by=SummarizeAnalysis)
self._rc.memory.add(rsp)
# save code using datetime.now or keywords related to the goal of your project (plan.goal).
project_record = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
save_code_file(name=project_record, code_context=self.execute_code.nb, file_format="ipynb")
return rsp
async def _update_data_columns(self):
rsp = await UpdateDataColumns().run(self.plan)
is_update, code = rsp["is_update"], rsp["code"]
@ -140,25 +141,20 @@ class MLEngineer(Role):
if success:
self.data_desc["column_info"] = result
return success, code
async def _write_and_exec_code(self, max_retry: int = 3):
self.plan.current_task.code_steps = (
await WriteCodeSteps().run(self.plan)
if self.use_code_steps
else ""
)
counter = 0
success = False
debug_context = []
while not success and counter < max_retry:
context = self.get_useful_memories()
# print("*" * 10)
# print(context)
# print("*" * 10)
# breakpoint()
if counter > 0 and (self.use_tools or self.use_udfs):
logger.warning('We got a bug code, now start to debug...')
code = await DebugCode().run(
@ -203,11 +199,11 @@ class MLEngineer(Role):
)
debug_context = tool_context
cause_by = WriteCodeWithTools
self.working_memory.add(
Message(content=code, role="assistant", cause_by=cause_by)
)
result, success = await self.execute_code.run(code)
print(result)
# make tools for successful code and long code.
@ -217,20 +213,20 @@ class MLEngineer(Role):
self.working_memory.add(
Message(content=result, role="user", cause_by=ExecutePyCode)
)
if "!pip" in code:
success = False
counter += 1
if not success and counter >= max_retry:
logger.info("coding failed!")
review, _ = await self._ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER)
if ReviewConst.CHANGE_WORD[0] in review:
counter = 0 # redo the task again with help of human suggestions
return code, result, success
async def _ask_review(self, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER):
auto_run = auto_run or self.auto_run
if not auto_run:
@ -240,7 +236,7 @@ class MLEngineer(Role):
self.working_memory.add(Message(content=review, role="user", cause_by=AskReview))
return review, confirmed
return "", True
async def _update_plan(self, review: str = "", max_tasks: int = 3, max_retries: int = 3):
plan_confirmed = False
while not plan_confirmed:
@ -251,7 +247,7 @@ class MLEngineer(Role):
self.working_memory.add(
Message(content=rsp, role="assistant", cause_by=WritePlan)
)
# precheck plan before asking reviews
is_plan_valid, error = precheck_update_plan_from_rsp(rsp, self.plan)
if not is_plan_valid and max_retries > 0:
@ -260,23 +256,21 @@ class MLEngineer(Role):
self.working_memory.add(Message(content=error_msg, role="assistant", cause_by=WritePlan))
max_retries -= 1
continue
_, plan_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
update_plan_from_rsp(rsp, self.plan)
self.working_memory.clear()
async def _reflect(self):
context = self.get_memories()
context = "\n".join([str(msg) for msg in context])
# print("*" * 10)
# print(context)
# print("*" * 10)
reflection = await Reflect().run(context=context)
self.working_memory.add(Message(content=reflection, role="assistant"))
self.working_memory.add(Message(content=Reflect.REWRITE_PLAN_INSTRUCTION, role="user"))
def get_useful_memories(self, task_exclude_field=None) -> List[Message]:
"""find useful memories only to reduce context length and improve performance"""
# TODO dataset description , code steps
@ -293,7 +287,7 @@ class MLEngineer(Role):
user_requirement=user_requirement, data_desc=data_desc, tasks=tasks, current_task=current_task
)
context_msg = [Message(content=context, role="user")]
return context_msg + self.get_working_memories()
def get_working_memories(self) -> List[Message]:
@ -334,22 +328,74 @@ class MLEngineer(Role):
if __name__ == "__main__":
requirement = "Run data analysis on sklearn Iris dataset, include a plot"
# requirement = "Run data analysis on sklearn Iris dataset, include a plot"
# requirement = "Run data analysis on sklearn Diabetes dataset, include a plot"
# requirement = "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy"
# requirement = "Run data analysis on sklearn Wisconsin Breast Cancer dataset, include a plot, train a model to predict targets (20% as validation), and show validation accuracy"
# requirement = "Run EDA and visualization on this dataset, train a model to predict survival, report metrics on validation set (20%), dataset: workspace/titanic/train.csv"
async def main(requirement: str = requirement, auto_run: bool = True):
role = MLEngineer(goal=requirement, auto_run=auto_run)
# make udfs
role.make_udfs = True
role.use_udfs = False
await role.run(requirement)
# use udfs
role.reset()
role.make_udfs = False
role.use_udfs = True
await role.run(requirement)
# async def main(requirement: str = requirement, auto_run: bool = True):
# role = MLEngineer(goal=requirement, auto_run=auto_run)
# # make udfs
# role.make_udfs = True
# role.use_udfs = False
# await role.run(requirement)
# # use udfs
# role.reset()
# role.make_udfs = False
# role.use_udfs = True
# await role.run(requirement)
# requirement = "Perform data analysis on the provided data. Train a model to predict the target variable Survived. Include data preprocessing, feature engineering, and modeling in your pipeline. The metric is accuracy."
# data_path = f"{DATA_PATH}/titanic"
# requirement = f"This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'."
# requirement = f"Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy"
# data_path = f"{DATA_PATH}/icr-identify-age-related-conditions"
# requirement = f"This is a medical dataset with over fifty anonymized health characteristics linked to three age-related conditions. Your goal is to predict whether a subject has or has not been diagnosed with one of these conditions.The target column is Class. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report f1 score on the eval data. Train data path: {data_path}/split_train.csv, eval data path: {data_path}/split_eval.csv."
# data_path = f"{DATA_PATH}/santander-customer-transaction-prediction"
# requirement = f"This is a customers financial dataset. Your goal is to predict which customers will make a specific transaction in the future. The target column is target. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report F1 Score on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv' ."
data_path = f"{DATA_PATH}/house-prices-advanced-regression-techniques"
requirement = f"This is a house price dataset, your goal is to predict the sale price of a property based on its features. The target column is SalePrice. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report RMSE between the logarithm of the predicted value and the logarithm of the observed sales price on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'."
save_dir = ""
# save_dir = DATA_PATH / "output" / "2023-12-14_20-40-34"
async def main(requirement: str = requirement, auto_run: bool = True, save_dir: str = save_dir):
"""
The main function to run the MLEngineer with optional history loading.
Args:
requirement (str): The requirement for the MLEngineer.
auto_run (bool): Whether to auto-run the MLEngineer.
save_dir (str): The directory from which to load the history or to save the new history.
Raises:
Exception: If an error occurs during execution, log the error and save the history.
"""
if save_dir:
logger.info("Resuming from history trajectory")
plan, nb = load_history(save_dir)
role = MLEngineer(goal=requirement, auto_run=auto_run)
role.plan = Plan(**plan)
role.execute_code = ExecutePyCode(nb)
else:
logger.info("Run from scratch")
role = MLEngineer(goal=requirement, auto_run=auto_run)
try:
await role.run(requirement)
except Exception as e:
save_path = save_history(role, save_dir)
logger.exception(f"An error occurred: {e}, save trajectory here: {save_path}")
fire.Fire(main)

View file

@ -0,0 +1,148 @@
import re
from typing import List
import json
from datetime import datetime
import fire
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.memory import Memory
from metagpt.logs import logger
from metagpt.actions.write_analysis_code import WriteCodeByGenerate
from metagpt.actions.ml_da_action import AskReview, ReviewConst
from metagpt.actions.execute_code import ExecutePyCode
from metagpt.roles.kaggle_manager import DownloadData
from metagpt.utils.save_code import save_code_file
STRUCTURAL_CONTEXT_SIMPLE = """
## User Requirement
{user_requirement}
## Data Description
{data_desc}
"""
JUDGE_PROMPT_TEMPLATE = """
# User Requirement
{user_requirement}
-----
# Context
{context}
-----
# State
Output "Ture" or "False". Judging from the code perspective, whether the user's needs have been completely fulfilled.
=====
# Output State("Ture" or "False") firstly, then output Thought and Next Steps for the code requirements based on the context respectively in one sentence
State:
Thought:
Next Steps:
"""
class MLEngineerSimple(Role):
def __init__(
self, name="ABC", profile="MLEngineerSimple", goal="", auto_run: bool = False
):
super().__init__(name=name, profile=profile, goal=goal)
self._set_react_mode(react_mode="react")
self._watch([DownloadData])
self._init_actions([WriteCodeByGenerate, ExecutePyCode])
self.goal = goal
self.data_desc = ""
self.use_tools = False
self.use_code_steps = False
self.execute_code = ExecutePyCode()
self.auto_run = auto_run
# memory for working on each task, discarded each time a task is done
self.working_memory = Memory()
async def _act(self):
memories = self.get_memories()
if memories:
latest_event = memories[-1].cause_by
if latest_event == DownloadData:
self.data_desc = memories[-1].content
await self._act_no_plan()
# save code using datetime.now or keywords related to the goal of your project (plan.goal).
project_record = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
save_code_file(name=project_record, code_context=self.execute_code.nb, file_format="ipynb")
async def _act_no_plan(self, max_retry: int = 20):
counter = 0
state = False
while not state and counter < max_retry:
context = self.get_useful_memories()
print(f"memories数量{len(context)}")
# print("===\n" +str(context) + "\n===")
code = await WriteCodeByGenerate().run(
context=context, temperature=0.0
)
cause_by = WriteCodeByGenerate
self.working_memory.add(
Message(content=code, role="assistant", cause_by=cause_by)
)
result, success = await self.execute_code.run(code)
print(result)
self.working_memory.add(
Message(content=result, role="user", cause_by=ExecutePyCode)
)
if "!pip" in code:
success = False
counter += 1
if not success and counter >= max_retry:
logger.info("coding failed!")
review, _ = await self._ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER)
if ReviewConst.CHANGE_WORD[0] in review:
counter = 0 # redo the task again with help of human suggestions
completed_plan_memory = self.get_useful_memories() # completed plan as a outcome
self._rc.memory.add(completed_plan_memory[0]) # add to persistent memory
prompt = JUDGE_PROMPT_TEMPLATE.format(user_requirement=self.goal, context=completed_plan_memory)
rsp = await self._llm.aask(prompt)
self.working_memory.add(
Message(content=rsp, role="system")
)
matches = re.findall(r'\b(True|False)\b', rsp)
state = False if 'False' in matches else True
async def _ask_review(self, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER):
auto_run = auto_run or self.auto_run
if not auto_run:
context = self.get_useful_memories()
review, confirmed = await AskReview().run(context=context[-5:], trigger=trigger)
if not confirmed:
self.working_memory.add(Message(content=review, role="user", cause_by=AskReview))
return review, confirmed
return "", True
def get_useful_memories(self) -> List[Message]:
"""find useful memories only to reduce context length and improve performance"""
user_requirement = self.goal
context = STRUCTURAL_CONTEXT_SIMPLE.format(
user_requirement=user_requirement, data_desc=self.data_desc
)
context_msg = [Message(content=context, role="user")]
return context_msg + self.get_working_memories(6)
def get_working_memories(self, num=0) -> List[Message]:
return self.working_memory.get(num) # 默认为6
if __name__ == "__main__":
requirement = "Run data analysis on sklearn Iris dataset, include a plot"
async def main(requirement: str = requirement, auto_run: bool = True):
role = MLEngineerSimple(goal=requirement, auto_run=auto_run)
await role.run(requirement)
fire.Fire(main)

View file

@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
# @Date : 12/20/2023 11:07 AM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import nbformat
from pathlib import Path
import json
from datetime import datetime
from metagpt.roles.role import Role
from metagpt.roles.ml_engineer import MLEngineer
from metagpt.const import DATA_PATH
from metagpt.utils.save_code import save_code_file
def load_history(save_dir: str = ""):
"""
Load history from the specified save directory.
Args:
save_dir (str): The directory from which to load the history.
Returns:
Tuple: A tuple containing the loaded plan and notebook.
"""
plan_path = Path(save_dir) / "plan.json"
nb_path = Path(save_dir) / "history_nb" / "code.ipynb"
plan = json.load(open(plan_path, "r", encoding="utf-8"))
nb = nbformat.read(open(nb_path, "r", encoding="utf-8"), as_version=nbformat.NO_CONVERT)
return plan, nb
def save_history(role: Role = MLEngineer, save_dir: str = ""):
"""
Save history to the specified directory.
Args:
role (Role): The role containing the plan and execute_code attributes.
save_dir (str): The directory to save the history.
Returns:
Path: The path to the saved history directory.
"""
record_time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
save_path = DATA_PATH / "output" / f"{record_time}"
# overwrite exist trajectory
save_path.mkdir(parents=True, exist_ok=True)
plan = role.plan.dict()
with open(save_path / "plan.json", "w", encoding="utf-8") as plan_file:
json.dump(plan, plan_file, indent=4, ensure_ascii=False)
save_code_file(name=Path(record_time) / "history_nb", code_context=role.execute_code.nb, file_format="ipynb")
return save_path