diff --git a/.gitignore b/.gitignore index 9b679d48a..e03eab3d3 100644 --- a/.gitignore +++ b/.gitignore @@ -164,4 +164,3 @@ tmp output.wav metagpt/roles/idea_agent.py .aider* -/tests/metagpt/actions/check_data.py diff --git a/metagpt/actions/execute_code.py b/metagpt/actions/execute_code.py index 36e01ed0e..6e4a6fd6e 100644 --- a/metagpt/actions/execute_code.py +++ b/metagpt/actions/execute_code.py @@ -46,9 +46,12 @@ class ExecuteCode(ABC): class ExecutePyCode(ExecuteCode, Action): """execute code, return result to llm, and display it.""" - def __init__(self, name: str = "python_executor", context=None, llm=None): + def __init__(self, name: str = "python_executor", context=None, llm=None, nb=None): super().__init__(name, context, llm) - self.nb = nbformat.v4.new_notebook() + if nb is None: + self.nb = nbformat.v4.new_notebook() + else: + self.nb = nb self.nb_client = NotebookClient(self.nb) self.console = Console() self.interaction = "ipython" if self.is_ipython() else "terminal" diff --git a/metagpt/actions/write_analysis_code.py b/metagpt/actions/write_analysis_code.py index 6970fb4f0..ecbb68122 100644 --- a/metagpt/actions/write_analysis_code.py +++ b/metagpt/actions/write_analysis_code.py @@ -201,6 +201,8 @@ class WriteCodeWithTools(BaseWriteAnalysisCode): module_name=module_name, tool_catalog=tool_catalog, ) + + else: prompt = GENERATE_CODE_PROMPT.format( user_requirement=plan.goal, diff --git a/metagpt/actions/write_code_steps.py b/metagpt/actions/write_code_steps.py index 3c08adc19..79f3e5902 100644 --- a/metagpt/actions/write_code_steps.py +++ b/metagpt/actions/write_code_steps.py @@ -106,7 +106,7 @@ class WriteCodeSteps(Action): def process_task(task): task_dict = task.dict() # ptask = {k: task_dict[k] for k in task_dict if k in select_task_keys } - ptask = f"task_id_{task_dict['task_id']}:{task_dict['instruction']}\n" + ptask = f"task_id_{task_dict['task_id']}:{task_dict['instruction']}" return ptask diff --git a/metagpt/roles/ml_engineer.py b/metagpt/roles/ml_engineer.py index 3755e7bac..33b570d1a 100644 --- a/metagpt/roles/ml_engineer.py +++ b/metagpt/roles/ml_engineer.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List import json from datetime import datetime @@ -25,6 +25,7 @@ from metagpt.roles.kaggle_manager import DownloadData, SubmitResult from metagpt.schema import Message, Plan from metagpt.utils.common import remove_comments, create_func_config from metagpt.utils.save_code import save_code_file +from metagpt.utils.recovery_util import save_history, load_history class UpdateDataColumns(Action): @@ -45,19 +46,19 @@ class MLEngineer(Role): super().__init__(name=name, profile=profile, goal=goal) self._set_react_mode(react_mode="plan_and_act") self._watch([DownloadData, SubmitResult]) - + self.plan = Plan(goal=goal) - self.use_tools = False - self.use_code_steps = False + self.use_tools = True + self.use_code_steps = True self.execute_code = ExecutePyCode() self.auto_run = auto_run self.data_desc = {} - + # memory for working on each task, discarded each time a task is done self.working_memory = Memory() - + async def _plan_and_act(self): - + ### Actions in a multi-agent multi-turn setting ### memories = self.get_memories() if memories: @@ -67,43 +68,43 @@ class MLEngineer(Role): elif latest_event == SubmitResult: # self reflect on previous plan outcomes and think about how to improve the plan, add to working memory await self._reflect() - + # get feedback for improvement from human, add to working memory await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - + ### Common Procedure in both single- and multi-agent setting ### # create initial plan and update until confirmation await self._update_plan() - + while self.plan.current_task: task = self.plan.current_task logger.info(f"ready to take on task {task}") - + # take on current task code, result, success = await self._write_and_exec_code() - + # ask for acceptance, users can other refuse and change tasks in the plan review, task_result_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - + if self.auto_run: # if human confirms the task result, then we deem the task completed, regardless of whether the code run succeeds; # if auto mode, then the code run has to succeed for the task to be considered completed task_result_confirmed = success - + if task_result_confirmed: # tick off this task and record progress task.code = code task.result = result self.plan.finish_current_task() self.working_memory.clear() - + if self.use_tools: success, new_code = await self._update_data_columns() if success: task.code = task.code + "\n\n" + new_code - + confirmed_and_more = (ReviewConst.CONTINUE_WORD[0] in review.lower() - and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)" + and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)" if confirmed_and_more: self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) await self._update_plan(review) @@ -112,23 +113,23 @@ class MLEngineer(Role): # Ask the Role to redo this task with help of review feedback, # useful when the code run is successful but the procedure or result is not what we want continue - + else: # update plan according to user's feedback and to take on changed tasks await self._update_plan(review) - + completed_plan_memory = self.get_useful_memories() # completed plan as a outcome self._rc.memory.add(completed_plan_memory[0]) # add to persistent memory - + summary = await SummarizeAnalysis().run(self.plan) rsp = Message(content=summary, cause_by=SummarizeAnalysis) self._rc.memory.add(rsp) - + # save code using datetime.now or keywords related to the goal of your project (plan.goal). project_record = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") save_code_file(name=project_record, code_context=self.execute_code.nb, file_format="ipynb") return rsp - + async def _update_data_columns(self): rsp = await UpdateDataColumns().run(self.plan) is_update, code = rsp["is_update"], rsp["code"] @@ -138,25 +139,21 @@ class MLEngineer(Role): if success: self.data_desc["column_info"] = result return success, code - + async def _write_and_exec_code(self, max_retry: int = 3): self.plan.current_task.code_steps = ( await WriteCodeSteps().run(self.plan) if self.use_code_steps else "" ) - + counter = 0 success = False debug_context = [] - + while not success and counter < max_retry: context = self.get_useful_memories() - - # print("*" * 10) - # print(context) - # print("*" * 10) - # breakpoint() + if counter > 0 and self.use_tools: code = await DebugCode().run( plan=self.plan.current_task.instruction, @@ -183,30 +180,30 @@ class MLEngineer(Role): ) debug_context = tool_context cause_by = WriteCodeWithTools - + self.working_memory.add( Message(content=code, role="assistant", cause_by=cause_by) ) - + result, success = await self.execute_code.run(code) print(result) self.working_memory.add( Message(content=result, role="user", cause_by=ExecutePyCode) ) - + if "!pip" in code: success = False - + counter += 1 - + if not success and counter >= max_retry: logger.info("coding failed!") review, _ = await self._ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER) if ReviewConst.CHANGE_WORD[0] in review: counter = 0 # redo the task again with help of human suggestions - + return code, result, success - + async def _ask_review(self, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER): auto_run = auto_run or self.auto_run if not auto_run: @@ -216,7 +213,7 @@ class MLEngineer(Role): self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) return review, confirmed return "", True - + async def _update_plan(self, review: str = "", max_tasks: int = 3, max_retries: int = 3): plan_confirmed = False while not plan_confirmed: @@ -227,7 +224,7 @@ class MLEngineer(Role): self.working_memory.add( Message(content=rsp, role="assistant", cause_by=WritePlan) ) - + # precheck plan before asking reviews is_plan_valid, error = precheck_update_plan_from_rsp(rsp, self.plan) if not is_plan_valid and max_retries > 0: @@ -236,23 +233,21 @@ class MLEngineer(Role): self.working_memory.add(Message(content=error_msg, role="assistant", cause_by=WritePlan)) max_retries -= 1 continue - + _, plan_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - + update_plan_from_rsp(rsp, self.plan) - + self.working_memory.clear() async def _reflect(self): context = self.get_memories() context = "\n".join([str(msg) for msg in context]) - # print("*" * 10) - # print(context) - # print("*" * 10) + reflection = await Reflect().run(context=context) self.working_memory.add(Message(content=reflection, role="assistant")) self.working_memory.add(Message(content=Reflect.REWRITE_PLAN_INSTRUCTION, role="user")) - + def get_useful_memories(self, task_exclude_field=None) -> List[Message]: """find useful memories only to reduce context length and improve performance""" # TODO dataset description , code steps @@ -269,7 +264,7 @@ class MLEngineer(Role): user_requirement=user_requirement, data_desc=data_desc, tasks=tasks, current_task=current_task ) context_msg = [Message(content=context, role="user")] - + return context_msg + self.get_working_memories() def get_working_memories(self) -> List[Message]: @@ -277,14 +272,61 @@ class MLEngineer(Role): if __name__ == "__main__": - requirement = "Run data analysis on sklearn Iris dataset, include a plot" + # requirement = "Run data analysis on sklearn Iris dataset, include a plot" # requirement = "Run data analysis on sklearn Diabetes dataset, include a plot" # requirement = "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy" # requirement = "Run data analysis on sklearn Wisconsin Breast Cancer dataset, include a plot, train a model to predict targets (20% as validation), and show validation accuracy" # requirement = "Run EDA and visualization on this dataset, train a model to predict survival, report metrics on validation set (20%), dataset: workspace/titanic/train.csv" + + # requirement = "Perform data analysis on the provided data. Train a model to predict the target variable Survived. Include data preprocessing, feature engineering, and modeling in your pipeline. The metric is accuracy." + + # data_path = f"{DATA_PATH}/titanic" + # requirement = f"This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'." + # requirement = f"Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy" + # data_path = f"{DATA_PATH}/icr-identify-age-related-conditions" + # requirement = f"This is a medical dataset with over fifty anonymized health characteristics linked to three age-related conditions. Your goal is to predict whether a subject has or has not been diagnosed with one of these conditions.The target column is Class. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report f1 score on the eval data. Train data path: {data_path}/split_train.csv, eval data path: {data_path}/split_eval.csv." + + # data_path = f"{DATA_PATH}/santander-customer-transaction-prediction" + # requirement = f"This is a customers financial dataset. Your goal is to predict which customers will make a specific transaction in the future. The target column is target. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report F1 Score on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv' ." + + data_path = f"{DATA_PATH}/house-prices-advanced-regression-techniques" + requirement = f"This is a house price dataset, your goal is to predict the sale price of a property based on its features. The target column is SalePrice. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report RMSE between the logarithm of the predicted value and the logarithm of the observed sales price on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'." + + save_dir = "" + + + # save_dir = DATA_PATH / "output" / "2023-12-14_20-40-34" + + async def main(requirement: str = requirement, auto_run: bool = True, save_dir: str = save_dir): + """ + The main function to run the MLEngineer with optional history loading. - async def main(requirement: str = requirement, auto_run: bool = True): - role = MLEngineer(goal=requirement, auto_run=auto_run) - await role.run(requirement) + Args: + requirement (str): The requirement for the MLEngineer. + auto_run (bool): Whether to auto-run the MLEngineer. + save_dir (str): The directory from which to load the history or to save the new history. + Raises: + Exception: If an error occurs during execution, log the error and save the history. + """ + if save_dir: + logger.info("Resuming from history trajectory") + plan, nb = load_history(save_dir) + role = MLEngineer(goal=requirement, auto_run=auto_run) + role.plan = Plan(**plan) + role.execute_code = ExecutePyCode(nb) + + else: + logger.info("Run from scratch") + role = MLEngineer(goal=requirement, auto_run=auto_run) + + try: + await role.run(requirement) + except Exception as e: + + save_path = save_history(role, save_dir) + + logger.exception(f"An error occurred: {e}, save trajectory here: {save_path}") + + fire.Fire(main) diff --git a/metagpt/utils/recovery_util.py b/metagpt/utils/recovery_util.py new file mode 100644 index 000000000..ef4f0aca7 --- /dev/null +++ b/metagpt/utils/recovery_util.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# @Date : 12/20/2023 11:07 AM +# @Author : stellahong (stellahong@fuzhi.ai) +# @Desc : +import nbformat +from pathlib import Path +import json +from datetime import datetime + +from metagpt.roles.role import Role +from metagpt.roles.ml_engineer import MLEngineer +from metagpt.const import DATA_PATH +from metagpt.utils.save_code import save_code_file + +def load_history(save_dir: str = ""): + """ + Load history from the specified save directory. + + Args: + save_dir (str): The directory from which to load the history. + + Returns: + Tuple: A tuple containing the loaded plan and notebook. + """ + + plan_path = Path(save_dir) / "plan.json" + nb_path = Path(save_dir) / "history_nb" / "code.ipynb" + plan = json.load(open(plan_path, "r", encoding="utf-8")) + nb = nbformat.read(open(nb_path, "r", encoding="utf-8"), as_version=nbformat.NO_CONVERT) + return plan, nb + + +def save_history(role: Role = MLEngineer, save_dir: str = ""): + """ + Save history to the specified directory. + + Args: + role (Role): The role containing the plan and execute_code attributes. + save_dir (str): The directory to save the history. + + Returns: + Path: The path to the saved history directory. + """ + record_time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + save_path = DATA_PATH / "output" / f"{record_time}" + + # overwrite exist trajectory + save_path.mkdir(parents=True, exist_ok=True) + + plan = role.plan.dict() + + with open(save_path / "plan.json", "w", encoding="utf-8") as plan_file: + json.dump(plan, plan_file, indent=4, ensure_ascii=False) + + save_code_file(name=Path(record_time) / "history_nb", code_context=role.execute_code.nb, file_format="ipynb") + return save_path \ No newline at end of file