diff --git a/.gitignore b/.gitignore index f3c674351..1a517d027 100644 --- a/.gitignore +++ b/.gitignore @@ -165,4 +165,3 @@ tmp output.wav metagpt/roles/idea_agent.py .aider* -/tests/metagpt/actions/check_data.py diff --git a/metagpt/actions/execute_code.py b/metagpt/actions/execute_code.py index 36e01ed0e..6e4a6fd6e 100644 --- a/metagpt/actions/execute_code.py +++ b/metagpt/actions/execute_code.py @@ -46,9 +46,12 @@ class ExecuteCode(ABC): class ExecutePyCode(ExecuteCode, Action): """execute code, return result to llm, and display it.""" - def __init__(self, name: str = "python_executor", context=None, llm=None): + def __init__(self, name: str = "python_executor", context=None, llm=None, nb=None): super().__init__(name, context, llm) - self.nb = nbformat.v4.new_notebook() + if nb is None: + self.nb = nbformat.v4.new_notebook() + else: + self.nb = nb self.nb_client = NotebookClient(self.nb) self.console = Console() self.interaction = "ipython" if self.is_ipython() else "terminal" diff --git a/metagpt/actions/write_analysis_code.py b/metagpt/actions/write_analysis_code.py index e50c069f0..02aba0e62 100644 --- a/metagpt/actions/write_analysis_code.py +++ b/metagpt/actions/write_analysis_code.py @@ -210,6 +210,8 @@ class WriteCodeWithTools(BaseWriteAnalysisCode): module_name=module_name, tool_catalog=tool_catalog, ) + + else: prompt = GENERATE_CODE_PROMPT.format( user_requirement=plan.goal, diff --git a/metagpt/actions/write_code_steps.py b/metagpt/actions/write_code_steps.py index 3c08adc19..79f3e5902 100644 --- a/metagpt/actions/write_code_steps.py +++ b/metagpt/actions/write_code_steps.py @@ -106,7 +106,7 @@ class WriteCodeSteps(Action): def process_task(task): task_dict = task.dict() # ptask = {k: task_dict[k] for k in task_dict if k in select_task_keys } - ptask = f"task_id_{task_dict['task_id']}:{task_dict['instruction']}\n" + ptask = f"task_id_{task_dict['task_id']}:{task_dict['instruction']}" return ptask diff --git a/metagpt/roles/ml_engineer.py b/metagpt/roles/ml_engineer.py index 9fa12b41d..5d514a18f 100644 --- a/metagpt/roles/ml_engineer.py +++ b/metagpt/roles/ml_engineer.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List import json from datetime import datetime @@ -25,6 +25,7 @@ from metagpt.roles.kaggle_manager import DownloadData, SubmitResult from metagpt.schema import Message, Plan from metagpt.utils.common import remove_comments, create_func_config from metagpt.utils.save_code import save_code_file +from metagpt.utils.recovery_util import save_history, load_history class UpdateDataColumns(Action): @@ -45,21 +46,21 @@ class MLEngineer(Role): super().__init__(name=name, profile=profile, goal=goal) self._set_react_mode(react_mode="plan_and_act") self._watch([DownloadData, SubmitResult]) - + self.plan = Plan(goal=goal) - self.use_tools = False self.make_udfs = False # user-defined functions self.use_udfs = False - self.use_code_steps = False + self.use_tools = True + self.use_code_steps = True self.execute_code = ExecutePyCode() self.auto_run = auto_run self.data_desc = {} - + # memory for working on each task, discarded each time a task is done self.working_memory = Memory() - + async def _plan_and_act(self): - + ### Actions in a multi-agent multi-turn setting ### memories = self.get_memories() if memories: @@ -69,29 +70,29 @@ class MLEngineer(Role): elif latest_event == SubmitResult: # self reflect on previous plan outcomes and think about how to improve the plan, add to working memory await self._reflect() - + # get feedback for improvement from human, add to working memory await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - + ### Common Procedure in both single- and multi-agent setting ### # create initial plan and update until confirmation await self._update_plan() - + while self.plan.current_task: task = self.plan.current_task logger.info(f"ready to take on task {task}") - + # take on current task code, result, success = await self._write_and_exec_code() - + # ask for acceptance, users can other refuse and change tasks in the plan review, task_result_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - + if self.auto_run: # if human confirms the task result, then we deem the task completed, regardless of whether the code run succeeds; # if auto mode, then the code run has to succeed for the task to be considered completed task_result_confirmed = success - + if task_result_confirmed: # tick off this task and record progress task.code = code @@ -103,9 +104,9 @@ class MLEngineer(Role): success, new_code = await self._update_data_columns() if success: task.code = task.code + "\n\n" + new_code - + confirmed_and_more = (ReviewConst.CONTINUE_WORD[0] in review.lower() - and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)" + and review.lower() not in ReviewConst.CONTINUE_WORD[0]) # "confirm, ... (more content, such as changing downstream tasks)" if confirmed_and_more: self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) await self._update_plan(review) @@ -114,23 +115,23 @@ class MLEngineer(Role): # Ask the Role to redo this task with help of review feedback, # useful when the code run is successful but the procedure or result is not what we want continue - + else: # update plan according to user's feedback and to take on changed tasks await self._update_plan(review) - + completed_plan_memory = self.get_useful_memories() # completed plan as a outcome self._rc.memory.add(completed_plan_memory[0]) # add to persistent memory - + summary = await SummarizeAnalysis().run(self.plan) rsp = Message(content=summary, cause_by=SummarizeAnalysis) self._rc.memory.add(rsp) - + # save code using datetime.now or keywords related to the goal of your project (plan.goal). project_record = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") save_code_file(name=project_record, code_context=self.execute_code.nb, file_format="ipynb") return rsp - + async def _update_data_columns(self): rsp = await UpdateDataColumns().run(self.plan) is_update, code = rsp["is_update"], rsp["code"] @@ -140,25 +141,20 @@ class MLEngineer(Role): if success: self.data_desc["column_info"] = result return success, code - + async def _write_and_exec_code(self, max_retry: int = 3): self.plan.current_task.code_steps = ( await WriteCodeSteps().run(self.plan) if self.use_code_steps else "" ) - + counter = 0 success = False debug_context = [] - + while not success and counter < max_retry: context = self.get_useful_memories() - - # print("*" * 10) - # print(context) - # print("*" * 10) - # breakpoint() if counter > 0 and (self.use_tools or self.use_udfs): logger.warning('We got a bug code, now start to debug...') code = await DebugCode().run( @@ -203,11 +199,11 @@ class MLEngineer(Role): ) debug_context = tool_context cause_by = WriteCodeWithTools - + self.working_memory.add( Message(content=code, role="assistant", cause_by=cause_by) ) - + result, success = await self.execute_code.run(code) print(result) # make tools for successful code and long code. @@ -217,20 +213,20 @@ class MLEngineer(Role): self.working_memory.add( Message(content=result, role="user", cause_by=ExecutePyCode) ) - + if "!pip" in code: success = False - + counter += 1 - + if not success and counter >= max_retry: logger.info("coding failed!") review, _ = await self._ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER) if ReviewConst.CHANGE_WORD[0] in review: counter = 0 # redo the task again with help of human suggestions - + return code, result, success - + async def _ask_review(self, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER): auto_run = auto_run or self.auto_run if not auto_run: @@ -240,7 +236,7 @@ class MLEngineer(Role): self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) return review, confirmed return "", True - + async def _update_plan(self, review: str = "", max_tasks: int = 3, max_retries: int = 3): plan_confirmed = False while not plan_confirmed: @@ -251,7 +247,7 @@ class MLEngineer(Role): self.working_memory.add( Message(content=rsp, role="assistant", cause_by=WritePlan) ) - + # precheck plan before asking reviews is_plan_valid, error = precheck_update_plan_from_rsp(rsp, self.plan) if not is_plan_valid and max_retries > 0: @@ -260,23 +256,21 @@ class MLEngineer(Role): self.working_memory.add(Message(content=error_msg, role="assistant", cause_by=WritePlan)) max_retries -= 1 continue - + _, plan_confirmed = await self._ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) - + update_plan_from_rsp(rsp, self.plan) - + self.working_memory.clear() async def _reflect(self): context = self.get_memories() context = "\n".join([str(msg) for msg in context]) - # print("*" * 10) - # print(context) - # print("*" * 10) + reflection = await Reflect().run(context=context) self.working_memory.add(Message(content=reflection, role="assistant")) self.working_memory.add(Message(content=Reflect.REWRITE_PLAN_INSTRUCTION, role="user")) - + def get_useful_memories(self, task_exclude_field=None) -> List[Message]: """find useful memories only to reduce context length and improve performance""" # TODO dataset description , code steps @@ -293,7 +287,7 @@ class MLEngineer(Role): user_requirement=user_requirement, data_desc=data_desc, tasks=tasks, current_task=current_task ) context_msg = [Message(content=context, role="user")] - + return context_msg + self.get_working_memories() def get_working_memories(self) -> List[Message]: @@ -334,22 +328,74 @@ class MLEngineer(Role): if __name__ == "__main__": - requirement = "Run data analysis on sklearn Iris dataset, include a plot" + # requirement = "Run data analysis on sklearn Iris dataset, include a plot" # requirement = "Run data analysis on sklearn Diabetes dataset, include a plot" # requirement = "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy" # requirement = "Run data analysis on sklearn Wisconsin Breast Cancer dataset, include a plot, train a model to predict targets (20% as validation), and show validation accuracy" # requirement = "Run EDA and visualization on this dataset, train a model to predict survival, report metrics on validation set (20%), dataset: workspace/titanic/train.csv" - async def main(requirement: str = requirement, auto_run: bool = True): - role = MLEngineer(goal=requirement, auto_run=auto_run) - # make udfs - role.make_udfs = True - role.use_udfs = False - await role.run(requirement) - # use udfs - role.reset() - role.make_udfs = False - role.use_udfs = True - await role.run(requirement) + # async def main(requirement: str = requirement, auto_run: bool = True): + # role = MLEngineer(goal=requirement, auto_run=auto_run) + # # make udfs + # role.make_udfs = True + # role.use_udfs = False + # await role.run(requirement) + # # use udfs + # role.reset() + # role.make_udfs = False + # role.use_udfs = True + # await role.run(requirement) + + # requirement = "Perform data analysis on the provided data. Train a model to predict the target variable Survived. Include data preprocessing, feature engineering, and modeling in your pipeline. The metric is accuracy." + + # data_path = f"{DATA_PATH}/titanic" + # requirement = f"This is a titanic passenger survival dataset, your goal is to predict passenger survival outcome. The target column is Survived. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report accuracy on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'." + # requirement = f"Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy" + # data_path = f"{DATA_PATH}/icr-identify-age-related-conditions" + # requirement = f"This is a medical dataset with over fifty anonymized health characteristics linked to three age-related conditions. Your goal is to predict whether a subject has or has not been diagnosed with one of these conditions.The target column is Class. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report f1 score on the eval data. Train data path: {data_path}/split_train.csv, eval data path: {data_path}/split_eval.csv." + + # data_path = f"{DATA_PATH}/santander-customer-transaction-prediction" + # requirement = f"This is a customers financial dataset. Your goal is to predict which customers will make a specific transaction in the future. The target column is target. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report F1 Score on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv' ." + + data_path = f"{DATA_PATH}/house-prices-advanced-regression-techniques" + requirement = f"This is a house price dataset, your goal is to predict the sale price of a property based on its features. The target column is SalePrice. Perform data analysis, data preprocessing, feature engineering, and modeling to predict the target. Report RMSE between the logarithm of the predicted value and the logarithm of the observed sales price on the eval data. Train data path: '{data_path}/split_train.csv', eval data path: '{data_path}/split_eval.csv'." + + save_dir = "" + + + # save_dir = DATA_PATH / "output" / "2023-12-14_20-40-34" + + async def main(requirement: str = requirement, auto_run: bool = True, save_dir: str = save_dir): + """ + The main function to run the MLEngineer with optional history loading. + + Args: + requirement (str): The requirement for the MLEngineer. + auto_run (bool): Whether to auto-run the MLEngineer. + save_dir (str): The directory from which to load the history or to save the new history. + + Raises: + Exception: If an error occurs during execution, log the error and save the history. + """ + if save_dir: + logger.info("Resuming from history trajectory") + plan, nb = load_history(save_dir) + role = MLEngineer(goal=requirement, auto_run=auto_run) + role.plan = Plan(**plan) + role.execute_code = ExecutePyCode(nb) + + else: + logger.info("Run from scratch") + role = MLEngineer(goal=requirement, auto_run=auto_run) + + try: + await role.run(requirement) + except Exception as e: + + save_path = save_history(role, save_dir) + + logger.exception(f"An error occurred: {e}, save trajectory here: {save_path}") + + fire.Fire(main) diff --git a/metagpt/roles/ml_engineer_simple.py b/metagpt/roles/ml_engineer_simple.py new file mode 100644 index 000000000..cc7d8fc97 --- /dev/null +++ b/metagpt/roles/ml_engineer_simple.py @@ -0,0 +1,148 @@ +import re +from typing import List +import json +from datetime import datetime + +import fire + +from metagpt.roles import Role +from metagpt.schema import Message +from metagpt.memory import Memory +from metagpt.logs import logger +from metagpt.actions.write_analysis_code import WriteCodeByGenerate +from metagpt.actions.ml_da_action import AskReview, ReviewConst +from metagpt.actions.execute_code import ExecutePyCode +from metagpt.roles.kaggle_manager import DownloadData +from metagpt.utils.save_code import save_code_file + +STRUCTURAL_CONTEXT_SIMPLE = """ +## User Requirement +{user_requirement} +## Data Description +{data_desc} +""" + +JUDGE_PROMPT_TEMPLATE = """ +# User Requirement +{user_requirement} +----- +# Context +{context} +----- +# State +Output "Ture" or "False". Judging from the code perspective, whether the user's needs have been completely fulfilled. +===== +# Output State("Ture" or "False") firstly, then output Thought and Next Steps for the code requirements based on the context respectively in one sentence +State: +Thought: +Next Steps: +""" + + +class MLEngineerSimple(Role): + def __init__( + self, name="ABC", profile="MLEngineerSimple", goal="", auto_run: bool = False + ): + super().__init__(name=name, profile=profile, goal=goal) + self._set_react_mode(react_mode="react") + self._watch([DownloadData]) + self._init_actions([WriteCodeByGenerate, ExecutePyCode]) + + self.goal = goal + self.data_desc = "" + self.use_tools = False + self.use_code_steps = False + self.execute_code = ExecutePyCode() + self.auto_run = auto_run + + # memory for working on each task, discarded each time a task is done + self.working_memory = Memory() + + async def _act(self): + memories = self.get_memories() + if memories: + latest_event = memories[-1].cause_by + if latest_event == DownloadData: + self.data_desc = memories[-1].content + + await self._act_no_plan() + + # save code using datetime.now or keywords related to the goal of your project (plan.goal). + project_record = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + save_code_file(name=project_record, code_context=self.execute_code.nb, file_format="ipynb") + + async def _act_no_plan(self, max_retry: int = 20): + counter = 0 + state = False + while not state and counter < max_retry: + context = self.get_useful_memories() + print(f"memories数量:{len(context)}") + # print("===\n" +str(context) + "\n===") + code = await WriteCodeByGenerate().run( + context=context, temperature=0.0 + ) + cause_by = WriteCodeByGenerate + self.working_memory.add( + Message(content=code, role="assistant", cause_by=cause_by) + ) + + result, success = await self.execute_code.run(code) + print(result) + self.working_memory.add( + Message(content=result, role="user", cause_by=ExecutePyCode) + ) + + if "!pip" in code: + success = False + + counter += 1 + + if not success and counter >= max_retry: + logger.info("coding failed!") + review, _ = await self._ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER) + if ReviewConst.CHANGE_WORD[0] in review: + counter = 0 # redo the task again with help of human suggestions + + completed_plan_memory = self.get_useful_memories() # completed plan as a outcome + self._rc.memory.add(completed_plan_memory[0]) # add to persistent memory + prompt = JUDGE_PROMPT_TEMPLATE.format(user_requirement=self.goal, context=completed_plan_memory) + rsp = await self._llm.aask(prompt) + self.working_memory.add( + Message(content=rsp, role="system") + ) + + matches = re.findall(r'\b(True|False)\b', rsp) + state = False if 'False' in matches else True + + async def _ask_review(self, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER): + auto_run = auto_run or self.auto_run + if not auto_run: + context = self.get_useful_memories() + review, confirmed = await AskReview().run(context=context[-5:], trigger=trigger) + if not confirmed: + self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) + return review, confirmed + return "", True + + def get_useful_memories(self) -> List[Message]: + """find useful memories only to reduce context length and improve performance""" + user_requirement = self.goal + context = STRUCTURAL_CONTEXT_SIMPLE.format( + user_requirement=user_requirement, data_desc=self.data_desc + ) + context_msg = [Message(content=context, role="user")] + + return context_msg + self.get_working_memories(6) + + def get_working_memories(self, num=0) -> List[Message]: + return self.working_memory.get(num) # 默认为6 + + +if __name__ == "__main__": + requirement = "Run data analysis on sklearn Iris dataset, include a plot" + + async def main(requirement: str = requirement, auto_run: bool = True): + role = MLEngineerSimple(goal=requirement, auto_run=auto_run) + await role.run(requirement) + + fire.Fire(main) diff --git a/metagpt/utils/recovery_util.py b/metagpt/utils/recovery_util.py new file mode 100644 index 000000000..ef4f0aca7 --- /dev/null +++ b/metagpt/utils/recovery_util.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# @Date : 12/20/2023 11:07 AM +# @Author : stellahong (stellahong@fuzhi.ai) +# @Desc : +import nbformat +from pathlib import Path +import json +from datetime import datetime + +from metagpt.roles.role import Role +from metagpt.roles.ml_engineer import MLEngineer +from metagpt.const import DATA_PATH +from metagpt.utils.save_code import save_code_file + +def load_history(save_dir: str = ""): + """ + Load history from the specified save directory. + + Args: + save_dir (str): The directory from which to load the history. + + Returns: + Tuple: A tuple containing the loaded plan and notebook. + """ + + plan_path = Path(save_dir) / "plan.json" + nb_path = Path(save_dir) / "history_nb" / "code.ipynb" + plan = json.load(open(plan_path, "r", encoding="utf-8")) + nb = nbformat.read(open(nb_path, "r", encoding="utf-8"), as_version=nbformat.NO_CONVERT) + return plan, nb + + +def save_history(role: Role = MLEngineer, save_dir: str = ""): + """ + Save history to the specified directory. + + Args: + role (Role): The role containing the plan and execute_code attributes. + save_dir (str): The directory to save the history. + + Returns: + Path: The path to the saved history directory. + """ + record_time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + save_path = DATA_PATH / "output" / f"{record_time}" + + # overwrite exist trajectory + save_path.mkdir(parents=True, exist_ok=True) + + plan = role.plan.dict() + + with open(save_path / "plan.json", "w", encoding="utf-8") as plan_file: + json.dump(plan, plan_file, indent=4, ensure_ascii=False) + + save_code_file(name=Path(record_time) / "history_nb", code_context=role.execute_code.nb, file_format="ipynb") + return save_path \ No newline at end of file